Merge branch 'master' into feature/profiler-v4

This commit is contained in:
gmega 2024-10-23 16:46:19 -03:00
commit f85a3b43c2
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
33 changed files with 3162 additions and 849 deletions

View File

@ -18,27 +18,39 @@ jobs:
target:
- os: linux
cpu: amd64
- os: linux-gcc-14 # this is to use ubuntu 24 and install gcc 14. Must be removed when ubuntu-latest is 24.04
cpu: amd64
- os: linux
cpu: i386
- os: macos
cpu: amd64
- os: macos
cpu: arm64
- os: windows
cpu: amd64
#- os: windows
#cpu: i386
branch: [version-1-6, version-2-0, devel]
include:
- target:
os: linux
builder: ubuntu-20.04
builder: ubuntu-latest
shell: bash
- target:
os: linux-gcc-14 # this is to use ubuntu 24 and install gcc 14. Must be removed when ubuntu-latest is 24.04
builder: ubuntu-24.04
shell: bash
- target:
os: macos
builder: macos-11
cpu: amd64
builder: macos-13
shell: bash
- target:
os: macos
cpu: arm64
builder: macos-latest
shell: bash
- target:
os: windows
builder: windows-2019
builder: windows-latest
shell: msys2 {0}
defaults:
@ -50,7 +62,7 @@ jobs:
continue-on-error: ${{ matrix.branch == 'devel' }}
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Enable debug verbosity
if: runner.debug == '1'
@ -62,8 +74,8 @@ jobs:
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
sudo dpkg --add-architecture i386
sudo apt-fast update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \
sudo apt-get update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-get install \
--no-install-recommends -yq gcc-multilib g++-multilib \
libssl-dev:i386
mkdir -p external/bin
@ -102,7 +114,7 @@ jobs:
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
@ -126,6 +138,8 @@ jobs:
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
elif [[ '${{ matrix.target.cpu }}' == 'arm64' ]]; then
PLATFORM=arm64
else
PLATFORM=x86
fi
@ -157,6 +171,15 @@ jobs:
bash build_nim.sh nim csources dist/nimble NimBinaries
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Use gcc 14 # Must be removed when ubuntu-latest is 24.04 and gcc 14 is the default
if : ${{ matrix.target.os == 'linux-gcc-14' }}
run: |
# Add GCC-14 to alternatives
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-14 14
# Set GCC-14 as the default
sudo update-alternatives --set gcc /usr/bin/gcc-14
- name: Run tests
run: |
nim --version

View File

@ -15,7 +15,7 @@ jobs:
continue-on-error: true
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: true
- uses: actions-rs/install@v0.1
@ -41,7 +41,7 @@ jobs:
- uses: jiro4989/setup-nim-action@v1
with:
nim-version: '1.6.16'
nim-version: '1.6.20'
- name: Generate doc
run: |

View File

@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "chronos"
version = "4.0.0"
version = "4.0.3"
author = "Status Research & Development GmbH"
description = "Networking framework with async/await support"
license = "MIT or Apache License 2.0"
@ -10,7 +10,7 @@ skipDirs = @["tests"]
requires "nim >= 1.6.16",
"results",
"stew",
"bearssl",
"bearssl >= 0.2.5",
"httputils",
"unittest2"
@ -20,6 +20,7 @@ let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
let flags = getEnv("NIMFLAGS", "") # Extra flags for the compiler
let verbose = getEnv("V", "") notin ["", "0"]
let platform = getEnv("PLATFORM", "")
let testArguments =
when defined(windows):
[
@ -53,22 +54,31 @@ task examples, "Build examples":
build "--threads:on", file
task test, "Run all tests":
for args in testArguments:
# First run tests with `refc` memory manager.
run args & " --mm:refc", "tests/testall"
if (NimMajor, NimMinor) > (1, 6):
run args & " --mm:orc", "tests/testall"
task test_v3_compat, "Run all tests in v3 compatibility mode":
for args in testArguments:
if (NimMajor, NimMinor) > (1, 6):
# First run tests with `refc` memory manager.
run args & " --mm:refc", "tests/testall"
run args, "tests/testall"
run args & " --mm:refc -d:chronosHandleException", "tests/testall"
run args & " -d:chronosHandleException", "tests/testall"
task test_libbacktrace, "test with libbacktrace":
let allArgs = @[
"-d:release --debugger:native -d:chronosStackTrace -d:nimStackTraceOverride --import:libbacktrace",
]
if platform != "x86":
let allArgs = @[
"-d:release --debugger:native -d:chronosStackTrace -d:nimStackTraceOverride --import:libbacktrace",
]
for args in allArgs:
if (NimMajor, NimMinor) > (1, 6):
for args in allArgs:
# First run tests with `refc` memory manager.
run args & " --mm:refc", "tests/testall"
run args, "tests/testall"
if (NimMajor, NimMinor) > (1, 6):
run args & " --mm:orc", "tests/testall"
task test_profiler, "test with profiler instrumentation":
var allArgs = @[

View File

@ -159,6 +159,7 @@ type
redirectCount: int
timestamp*: Moment
duration*: Duration
headersBuffer: seq[byte]
HttpClientRequestRef* = ref HttpClientRequest
@ -567,7 +568,8 @@ proc new(
tls =
try:
newTLSClientAsyncStream(treader, twriter, ha.hostname,
flags = session.flags.getTLSFlags())
flags = session.flags.getTLSFlags(),
bufferSize = session.connectionBufferSize)
except TLSStreamInitError as exc:
return err(exc.msg)
@ -858,6 +860,7 @@ proc closeWait*(request: HttpClientRequestRef) {.async: (raises: []).} =
await noCancel(allFutures(pending))
request.session = nil
request.error = nil
request.headersBuffer.reset()
request.state = HttpReqRespState.Closed
untrackCounter(HttpClientRequestTrackerName)
@ -991,14 +994,14 @@ proc prepareResponse(
proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {.
async: (raises: [CancelledError, HttpError]).} =
var buffer: array[HttpMaxHeadersSize, byte]
let timestamp = Moment.now()
req.connection.setTimestamp(timestamp)
let
bytesRead =
try:
await req.connection.reader.readUntil(addr buffer[0],
len(buffer), HeadersMark).wait(
await req.connection.reader.readUntil(addr req.headersBuffer[0],
len(req.headersBuffer),
HeadersMark).wait(
req.session.headersTimeout)
except AsyncTimeoutError:
raiseHttpReadError("Reading response headers timed out")
@ -1006,23 +1009,25 @@ proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {.
raiseHttpReadError(
"Could not read response headers, reason: " & $exc.msg)
let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1))
if response.isErr():
raiseHttpProtocolError(response.error())
let res = response.get()
res.setTimestamp(timestamp)
return res
let response =
prepareResponse(req,
req.headersBuffer.toOpenArray(0, bytesRead - 1)).valueOr:
raiseHttpProtocolError(error)
response.setTimestamp(timestamp)
response
proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, meth: HttpMethod = MethodGet,
version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[byte] = []): HttpClientRequestRef =
let res = HttpClientRequestRef(
state: HttpReqRespState.Ready, session: session, meth: meth,
version: version, flags: flags, headers: HttpTable.init(headers),
address: ha, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body
address: ha, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body,
headersBuffer: newSeq[byte](max(maxResponseHeadersSize, HttpMaxHeadersSize))
)
trackCounter(HttpClientRequestTrackerName)
res
@ -1031,13 +1036,15 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
url: string, meth: HttpMethod = MethodGet,
version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[byte] = []): HttpResult[HttpClientRequestRef] =
let address = ? session.getAddress(parseUri(url))
let res = HttpClientRequestRef(
state: HttpReqRespState.Ready, session: session, meth: meth,
version: version, flags: flags, headers: HttpTable.init(headers),
address: address, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body
address: address, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body,
headersBuffer: newSeq[byte](max(maxResponseHeadersSize, HttpMaxHeadersSize))
)
trackCounter(HttpClientRequestTrackerName)
ok(res)
@ -1045,48 +1052,58 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
proc get*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
url: string, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = []
): HttpResult[HttpClientRequestRef] =
HttpClientRequestRef.new(session, url, MethodGet, version, flags, headers)
HttpClientRequestRef.new(session, url, MethodGet, version, flags,
maxResponseHeadersSize, headers)
proc get*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = []
): HttpClientRequestRef =
HttpClientRequestRef.new(session, ha, MethodGet, version, flags, headers)
HttpClientRequestRef.new(session, ha, MethodGet, version, flags,
maxResponseHeadersSize, headers)
proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
url: string, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[byte] = []
): HttpResult[HttpClientRequestRef] =
HttpClientRequestRef.new(session, url, MethodPost, version, flags, headers,
body)
HttpClientRequestRef.new(session, url, MethodPost, version, flags,
maxResponseHeadersSize, headers, body)
proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
url: string, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[char] = []): HttpResult[HttpClientRequestRef] =
HttpClientRequestRef.new(session, url, MethodPost, version, flags, headers,
HttpClientRequestRef.new(session, url, MethodPost, version, flags,
maxResponseHeadersSize, headers,
body.toOpenArrayByte(0, len(body) - 1))
proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[byte] = []): HttpClientRequestRef =
HttpClientRequestRef.new(session, ha, MethodPost, version, flags, headers,
body)
HttpClientRequestRef.new(session, ha, MethodPost, version, flags,
maxResponseHeadersSize, headers, body)
proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, version: HttpVersion = HttpVersion11,
flags: set[HttpClientRequestFlag] = {},
maxResponseHeadersSize: int = HttpMaxHeadersSize,
headers: openArray[HttpHeaderTuple] = [],
body: openArray[char] = []): HttpClientRequestRef =
HttpClientRequestRef.new(session, ha, MethodPost, version, flags, headers,
HttpClientRequestRef.new(session, ha, MethodPost, version, flags,
maxResponseHeadersSize, headers,
body.toOpenArrayByte(0, len(body) - 1))
proc prepareRequest(request: HttpClientRequestRef): string =
@ -1327,13 +1344,18 @@ proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader {.
let reader =
case response.bodyFlag
of HttpClientBodyFlag.Sized:
let bstream = newBoundedStreamReader(response.connection.reader,
response.contentLength)
newHttpBodyReader(bstream)
newHttpBodyReader(
newBoundedStreamReader(
response.connection.reader, response.contentLength,
bufferSize = response.session.connectionBufferSize))
of HttpClientBodyFlag.Chunked:
newHttpBodyReader(newChunkedStreamReader(response.connection.reader))
newHttpBodyReader(
newChunkedStreamReader(
response.connection.reader,
bufferSize = response.session.connectionBufferSize))
of HttpClientBodyFlag.Custom:
newHttpBodyReader(newAsyncStreamReader(response.connection.reader))
newHttpBodyReader(
newAsyncStreamReader(response.connection.reader))
response.connection.state = HttpClientConnectionState.ResponseBodyReceiving
response.reader = reader
response.reader
@ -1448,8 +1470,10 @@ proc redirect*(request: HttpClientRequestRef,
var res = request.headers
res.set(HostHeader, ha.hostname)
res
var res = HttpClientRequestRef.new(request.session, ha, request.meth,
request.version, request.flags, headers.toList(), request.buffer)
var res =
HttpClientRequestRef.new(request.session, ha, request.meth,
request.version, request.flags, headers = headers.toList(),
body = request.buffer)
res.redirectCount = redirectCount
ok(res)
@ -1472,8 +1496,10 @@ proc redirect*(request: HttpClientRequestRef,
var res = request.headers
res.set(HostHeader, address.hostname)
res
var res = HttpClientRequestRef.new(request.session, address, request.meth,
request.version, request.flags, headers.toList(), request.buffer)
var res =
HttpClientRequestRef.new(request.session, address, request.meth,
request.version, request.flags, headers = headers.toList(),
body = request.buffer)
res.redirectCount = redirectCount
ok(res)

View File

@ -11,7 +11,7 @@
import std/[tables, uri, strutils]
import stew/[base10], httputils, results
import ../../[asyncloop, asyncsync]
import ../../[asyncloop, asyncsync, config]
import ../../streams/[asyncstream, boundstream, chunkstream]
import "."/[httptable, httpcommon, multipart]
from ../../transports/common import TransportAddress, ServerFlags, `$`, `==`
@ -244,7 +244,7 @@ proc new*(
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
bufferSize: int = chronosTransportDefaultBufferSize,
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
@ -304,7 +304,7 @@ proc new*(
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
bufferSize: int = chronosTransportDefaultBufferSize,
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
@ -1187,23 +1187,7 @@ proc closeWait*(server: HttpServerRef) {.async: (raises: []).} =
proc join*(server: HttpServerRef): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until HTTP server will not be closed.
var retFuture = newFuture[void]("http.server.join")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
server.lifetime.removeCallback(continuation, cast[pointer](retFuture))
if server.state == ServerClosed:
retFuture.complete()
else:
server.lifetime.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
retFuture
server.lifetime.join()
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
## Create new MultiPartReader interface for specific request.

View File

@ -20,6 +20,7 @@ export asyncloop, httptable, httpcommon, httpbodyrw, asyncstream, httputils
const
UnableToReadMultipartBody = "Unable to read multipart message body, reason: "
UnableToSendMultipartMessage = "Unable to send multipart message, reason: "
MaxMultipartHeaderSize = 4096
type
MultiPartSource* {.pure.} = enum
@ -142,10 +143,11 @@ proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader],
MultiPartReader(kind: MultiPartSource.Buffer,
buffer: buf, offset: 0, boundary: fboundary)
proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef],
stream: HttpBodyReader,
boundary: openArray[B],
partHeadersMaxSize = 4096): MultiPartReaderRef =
proc new*[B: BChar](
mpt: typedesc[MultiPartReaderRef],
stream: HttpBodyReader,
boundary: openArray[B],
partHeadersMaxSize = MaxMultipartHeaderSize): MultiPartReaderRef =
## Create new MultiPartReader instance with `stream` interface.
##
## ``stream`` is stream used to read data.

View File

@ -10,7 +10,7 @@
{.push raises: [].}
import httpserver
import ../../asyncloop, ../../asyncsync
import ../../[asyncloop, asyncsync, config]
import ../../streams/[asyncstream, tlsstream]
export asyncloop, asyncsync, httpserver, asyncstream, tlsstream
@ -91,7 +91,7 @@ proc new*(htype: typedesc[SecureHttpServerRef],
serverIdent = "",
secureFlags: set[TLSFlags] = {},
maxConnections: int = -1,
bufferSize: int = 4096,
bufferSize: int = chronosTransportDefaultBufferSize,
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
@ -157,7 +157,7 @@ proc new*(htype: typedesc[SecureHttpServerRef],
serverIdent = "",
secureFlags: set[TLSFlags] = {},
maxConnections: int = -1,
bufferSize: int = 4096,
bufferSize: int = chronosTransportDefaultBufferSize,
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,

View File

@ -1010,12 +1010,14 @@ else:
retFuture.fail(newException(AsyncProcessError,
osErrorMsg(res.error())))
timer = nil
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if not(isNil(timer)):
clearTimer(timer)
# Ignore any errors because of cancellation.
discard removeProcess2(processHandle)
if not(isNil(timer)):
clearTimer(timer)
timer = nil
# Ignore any errors because of cancellation.
discard removeProcess2(processHandle)
if timeout != InfiniteDuration:
timer = setTimer(Moment.fromNow(timeout), continuation, cast[pointer](2))

140
chronos/bipbuffer.nim Normal file
View File

@ -0,0 +1,140 @@
#
# Chronos
#
# (c) Copyright 2018-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
## This module implements Bip Buffer (bi-partite circular buffer) by Simone
## Cooke.
##
## The Bip-Buffer is like a circular buffer, but slightly different. Instead of
## keeping one head and tail pointer to the data in the buffer, it maintains two
## revolving regions, allowing for fast data access without having to worry
## about wrapping at the end of the buffer. Buffer allocations are always
## maintained as contiguous blocks, allowing the buffer to be used in a highly
## efficient manner with API calls, and also reducing the amount of copying
## which needs to be performed to put data into the buffer. Finally, a two-phase
## allocation system allows the user to pessimistically reserve an area of
## buffer space, and then trim back the buffer to commit to only the space which
## was used.
##
## https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
{.push raises: [].}
type
BipPos = object
start: Natural
finish: Natural
BipBuffer* = object
a, b, r: BipPos
data: seq[byte]
proc init*(t: typedesc[BipBuffer], size: int): BipBuffer =
## Creates new Bip Buffer with size `size`.
BipBuffer(data: newSeq[byte](size))
template len(pos: BipPos): Natural =
pos.finish - pos.start
template reset(pos: var BipPos) =
pos = BipPos()
func init(t: typedesc[BipPos], start, finish: Natural): BipPos =
BipPos(start: start, finish: finish)
func calcReserve(bp: BipBuffer): tuple[space: Natural, start: Natural] =
if len(bp.b) > 0:
(Natural(bp.a.start - bp.b.finish), bp.b.finish)
else:
let spaceAfterA = Natural(len(bp.data) - bp.a.finish)
if spaceAfterA >= bp.a.start:
(spaceAfterA, bp.a.finish)
else:
(bp.a.start, Natural(0))
func availSpace*(bp: BipBuffer): Natural =
## Returns amount of space available for reserve in buffer `bp`.
let (res, _) = bp.calcReserve()
res
func len*(bp: BipBuffer): Natural =
## Returns amount of used space in buffer `bp`.
len(bp.b) + len(bp.a)
proc reserve*(bp: var BipBuffer,
size: Natural = 0): tuple[data: ptr byte, size: Natural] =
## Reserve `size` bytes in buffer.
##
## If `size == 0` (default) reserve all available space from buffer.
##
## If there is not enough space in buffer for resevation - error will be
## returned.
##
## Returns current reserved range as pointer of type `pt` and size of
## type `st`.
const ErrorMessage = "Not enough space available"
doAssert(size <= len(bp.data))
let (availableSpace, reserveStart) = bp.calcReserve()
if availableSpace == 0:
raiseAssert ErrorMessage
let reserveLength =
if size == 0:
availableSpace
else:
if size < availableSpace:
raiseAssert ErrorMessage
size
bp.r = BipPos.init(reserveStart, Natural(reserveStart + reserveLength))
(addr bp.data[bp.r.start], len(bp.r))
proc commit*(bp: var BipBuffer, size: Natural) =
## Updates structure's pointers when new data inserted into buffer.
doAssert(len(bp.r) >= size,
"Committed size could not be larger than the previously reserved one")
if size == 0:
bp.r.reset()
return
let toCommit = min(size, len(bp.r))
if len(bp.a) == 0 and len(bp.b) == 0:
bp.a.start = bp.r.start
bp.a.finish = bp.r.start + toCommit
elif bp.r.start == bp.a.finish:
bp.a.finish += toCommit
else:
bp.b.finish += toCommit
bp.r.reset()
proc consume*(bp: var BipBuffer, size: Natural) =
## The procedure removes/frees `size` bytes from the buffer ``bp``.
var currentSize = size
if currentSize >= len(bp.a):
currentSize -= len(bp.a)
bp.a = bp.b
bp.b.reset()
if currentSize >= len(bp.a):
currentSize -= len(bp.a)
bp.a.reset()
else:
bp.a.start += currentSize
else:
bp.a.start += currentSize
iterator items*(bp: BipBuffer): byte =
## Iterates over all the bytes in the buffer.
for index in bp.a.start ..< bp.a.finish:
yield bp.data[index]
for index in bp.b.start ..< bp.b.finish:
yield bp.data[index]
iterator regions*(bp: var BipBuffer): tuple[data: ptr byte, size: Natural] =
## Iterates over all the regions (`a` and `b`) in the buffer.
if len(bp.a) > 0:
yield (addr bp.data[bp.a.start], len(bp.a))
if len(bp.b) > 0:
yield (addr bp.data[bp.b.start], len(bp.b))

View File

@ -102,6 +102,9 @@ const
chronosStreamDefaultBufferSize* {.intdefine.} = 16384
## Default size of chronos async stream internal buffer.
chronosTLSSessionCacheBufferSize* {.intdefine.} = 4096
## Default size of chronos TLS Session cache's internal buffer.
when defined(chronosStrictException):
{.warning: "-d:chronosStrictException has been deprecated in favor of handleException".}
# In chronos v3, this setting was used as the opposite of
@ -128,6 +131,8 @@ when defined(debug) or defined(chronosConfig):
chronosTransportDefaultBufferSize)
printOption("chronosStreamDefaultBufferSize",
chronosStreamDefaultBufferSize)
printOption("chronosTLSSessionCacheBufferSize",
chronosTLSSessionCacheBufferSize)
# In nim 1.6, `sink` + local variable + `move` generates the best code for
# moving a proc parameter into a closure - this only works for closure

View File

@ -76,22 +76,11 @@ template Finished*(T: type FutureState): FutureState {.
deprecated: "Use FutureState.Completed instead".} =
FutureState.Completed
proc newFutureImpl[T](loc: ptr SrcLoc): Future[T] =
let fut = Future[T]()
internalInitFutureBase(fut, loc, FutureState.Pending, {})
fut
proc newFutureImpl[T](loc: ptr SrcLoc, flags: FutureFlags): Future[T] =
let fut = Future[T]()
internalInitFutureBase(fut, loc, FutureState.Pending, flags)
fut
proc newInternalRaisesFutureImpl[T, E](
loc: ptr SrcLoc): InternalRaisesFuture[T, E] =
let fut = InternalRaisesFuture[T, E]()
internalInitFutureBase(fut, loc, FutureState.Pending, {})
fut
proc newInternalRaisesFutureImpl[T, E](
loc: ptr SrcLoc, flags: FutureFlags): InternalRaisesFuture[T, E] =
let fut = InternalRaisesFuture[T, E]()
@ -125,7 +114,7 @@ template newInternalRaisesFuture*[T, E](fromProc: static[string] = ""): auto =
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
newInternalRaisesFutureImpl[T, E](getSrcLocation(fromProc))
newInternalRaisesFutureImpl[T, E](getSrcLocation(fromProc), {})
template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] {.deprecated.} =
## Create a new future which can hold/preserve GC sequence until future will
@ -207,14 +196,14 @@ proc finish(fut: FutureBase, state: FutureState) =
when chronosFutureTracking:
scheduleDestructor(fut)
proc complete[T](future: Future[T], val: sink T, loc: ptr SrcLoc) =
proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
if not(future.cancelled()):
checkFinished(future, loc)
doAssert(isNil(future.internalError))
future.internalValue = chronosMoveSink(val)
future.internalValue = val
future.finish(FutureState.Completed)
template complete*[T](future: Future[T], val: sink T) =
template complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``.
complete(future, val, getSrcLocation())
@ -747,8 +736,8 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
retFuture.fail(fut2.error)
else:
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
fut1.addCallback(cb)
fut2.addCallback(cb)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
@ -799,7 +788,7 @@ template orImpl*[T, Y](fut1: Future[T], fut2: Future[Y]): untyped =
fut2.addCallback(cb)
retFuture.cancelCallback = cancellation
return retFuture
retFuture
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once either ``fut1`` or ``fut2``
@ -814,7 +803,7 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## completed, the result future will also be completed.
##
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.or")
var retFuture = newFuture[void]("chronos.or()")
orImpl(fut1, fut2)
@ -1055,19 +1044,24 @@ proc noCancel*[F: SomeFuture](future: F): auto = # async: (raw: true, raises: as
let retFuture = newFuture[F.T]("chronos.noCancel(T)",
{FutureFlag.OwnCancelSchedule})
template completeFuture() =
const canFail = when declared(InternalRaisesFutureRaises):
InternalRaisesFutureRaises isnot void
else:
true
if future.completed():
when F.T is void:
retFuture.complete()
else:
retFuture.complete(future.value)
elif future.failed():
when F is Future:
retFuture.fail(future.error, warn = false)
when declared(InternalRaisesFutureRaises):
when InternalRaisesFutureRaises isnot void:
retFuture.fail(future.error, warn = false)
else:
raiseAssert("Unexpected future state [" & $future.state & "]")
when canFail: # Avoid calling `failed` on non-failing raises futures
if future.failed():
retFuture.fail(future.error, warn = false)
else:
raiseAssert("Unexpected future state [" & $future.state & "]")
else:
raiseAssert("Unexpected future state [" & $future.state & "]")
proc continuation(udata: pointer) {.gcsafe.} =
completeFuture()
@ -1099,12 +1093,14 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] {.
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete()
reset(nfuts)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
for fut in nfuts:
if not(fut.finished()):
@ -1161,13 +1157,14 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
if not(retFuture.finished()):
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete(nfuts)
retFuture.complete(move(nfuts))
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for fut in nfuts.mitems():
if not(fut.finished()):
fut.removeCallback(cb)
reset(nfuts)
for fut in nfuts:
if not(fut.finished()):
@ -1181,7 +1178,65 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
return retFuture
proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
template oneImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: F
for i in 0..<len(nfuts):
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)
when declared(fut0):
fut0.addCallback(cb)
for fut in futs:
fut.addCallback(cb)
retFuture.cancelCallback = cancellation
return retFuture
proc one*[F: SomeFuture](fut0: F, futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[F]("chronos.one()")
if fut0.finished():
retFuture.complete(fut0)
return retFuture
oneImpl
proc one*[F: SomeFuture](futs: openArray[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
@ -1191,48 +1246,76 @@ proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[F]("chronos.one()")
let retFuture = newFuture[F]("chronos.one()")
if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture
oneImpl
template raceImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
# Because we can't capture openArray/varargs in closures we need to create copy.
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: F
var rfut = cast[FutureBase](udata)
var res: FutureBase
for i in 0..<len(nfuts):
if cast[FutureBase](nfuts[i]) != rfut:
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)
for fut in nfuts:
fut.addCallback(cb)
when declared(fut0):
fut0.addCallback(cb, cast[pointer](fut0))
for fut in futs:
fut.addCallback(cb, cast[pointer](fut))
retFuture.cancelCallback = cancellation
return retFuture
proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
proc race*(fut0: FutureBase, futs: varargs[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return finished FutureBase,
## when one of the given futures will be completed, failed or canceled.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
if fut0.finished:
retFuture.complete(fut0)
return retFuture
raceImpl
proc race*(futs: openArray[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed FutureBase,
## Returns a future which will complete and return finished FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
@ -1246,59 +1329,18 @@ proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture
raceImpl
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb, cast[pointer](fut))
retFuture.cancelCallback = cancellation
return retFuture
proc race*[T](futs: varargs[Future[T]]): Future[FutureBase] {.
proc race*(futs: openArray[SomeFuture]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
## If the argument is empty, the returned future FAILS immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))
proc race*[T, E](futs: varargs[InternalRaisesFuture[T, E]]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## On success returned Future will hold finished FutureBase.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On cancel futures in ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))
@ -1351,13 +1393,15 @@ proc sleepAsync*(duration: Duration): Future[void] {.
proc completion(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
timer = nil # Release circular reference (for gc:arc)
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if not isNil(timer):
clearTimer(timer)
timer = nil # Release circular reference (for gc:arc)
retFuture.cancelCallback = cancellation
timer = setTimer(moment, completion, cast[pointer](retFuture))
timer = setTimer(moment, completion)
return retFuture
proc sleepAsync*(ms: int): Future[void] {.
@ -1423,22 +1467,31 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
var
retFuture = newFuture[bool]("chronos.withTimeout",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
moment: Moment
timer: TimerCallback
timeouted = false
template completeFuture(fut: untyped): untyped =
template completeFuture(fut: untyped, timeout: bool): untyped =
if fut.failed() or fut.completed():
retFuture.complete(true)
else:
retFuture.cancelAndSchedule()
if timeout:
retFuture.complete(false)
else:
retFuture.cancelAndSchedule()
# TODO: raises annotation shouldn't be needed, but likely similar issue as
# https://github.com/nim-lang/Nim/issues/17369
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
if timeouted:
retFuture.complete(false)
# We should not unconditionally complete result future with `false`.
# Initiated by timeout handler cancellation could fail, in this case
# we could get `fut` in complete or in failed state, so we should
# complete result future with `true` instead of `false` here.
fut.completeFuture(timeouted)
return
if not(fut.finished()):
# Timer exceeded first, we going to cancel `fut` and wait until it
@ -1449,7 +1502,8 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
# Future `fut` completed/failed/cancelled first.
if not(isNil(timer)):
clearTimer(timer)
fut.completeFuture()
fut.completeFuture(false)
timer = nil
# TODO: raises annotation shouldn't be needed, but likely similar issue as
# https://github.com/nim-lang/Nim/issues/17369
@ -1459,7 +1513,8 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
clearTimer(timer)
fut.cancelSoon()
else:
fut.completeFuture()
fut.completeFuture(false)
timer = nil
if fut.finished():
retFuture.complete(true)
@ -1481,17 +1536,21 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
withTimeout(fut, timeout.milliseconds())
proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
var
moment: Moment
timer: TimerCallback
timeouted = false
proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto,
deadline: auto): auto =
var timeouted = false
template completeFuture(fut: untyped): untyped =
template completeFuture(fut: untyped, timeout: bool): untyped =
if fut.failed():
retFuture.fail(fut.error(), warn = false)
elif fut.cancelled():
retFuture.cancelAndSchedule()
if timeout:
# Its possible that `future` could be cancelled in some other place. In
# such case we can't detect if it was our cancellation due to timeout,
# or some other cancellation.
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelAndSchedule()
else:
when type(fut).T is void:
retFuture.complete()
@ -1501,7 +1560,64 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
proc continuation(udata: pointer) {.raises: [].} =
if not(retFuture.finished()):
if timeouted:
# When timeout is exceeded and we cancelled future via cancelSoon(),
# its possible that future at this moment already has value
# and/or error.
fut.completeFuture(timeouted)
return
if not(fut.finished()):
timeouted = true
fut.cancelSoon()
else:
fut.completeFuture(false)
var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
cancellation = proc(udata: pointer) {.gcsafe, raises: [].} =
deadline.removeCallback(continuation)
if not(fut.finished()):
fut.cancelSoon()
else:
fut.completeFuture(false)
if fut.finished():
fut.completeFuture(false)
else:
if deadline.finished():
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelCallback = cancellation
fut.addCallback(continuation)
deadline.addCallback(continuation)
retFuture
proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
var
moment: Moment
timer: TimerCallback
timeouted = false
template completeFuture(fut: untyped, timeout: bool): untyped =
if fut.failed():
retFuture.fail(fut.error(), warn = false)
elif fut.cancelled():
if timeout:
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelAndSchedule()
else:
when type(fut).T is void:
retFuture.complete()
else:
retFuture.complete(fut.value)
proc continuation(udata: pointer) {.raises: [].} =
if not(retFuture.finished()):
if timeouted:
# We should not unconditionally fail `retFuture` with
# `AsyncTimeoutError`. Initiated by timeout handler cancellation
# could fail, in this case we could get `fut` in complete or in failed
# state, so we should return error/value instead of `AsyncTimeoutError`.
fut.completeFuture(timeouted)
return
if not(fut.finished()):
# Timer exceeded first.
@ -1511,7 +1627,8 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
# Future `fut` completed/failed/cancelled first.
if not(isNil(timer)):
clearTimer(timer)
fut.completeFuture()
fut.completeFuture(false)
timer = nil
var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
cancellation = proc(udata: pointer) {.gcsafe, raises: [].} =
@ -1520,10 +1637,12 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
clearTimer(timer)
fut.cancelSoon()
else:
fut.completeFuture()
fut.completeFuture(false)
timer = nil
if fut.finished():
fut.completeFuture()
fut.completeFuture(false)
else:
if timeout.isZero():
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
@ -1548,7 +1667,10 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
## TODO: In case when ``fut`` got cancelled, what result Future[T]
## should return, because it can't be cancelled too.
var
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
retFuture = newFuture[T]("chronos.wait(duration)",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitImpl(fut, retFuture, timeout)
@ -1561,6 +1683,61 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
else:
wait(fut, timeout.milliseconds())
proc wait*[T](fut: Future[T], deadline: SomeFuture): Future[T] =
## Returns a future which will complete once future ``fut`` completes
## or if ``deadline`` future completes.
##
## If `deadline` future completes before future `fut` -
## `AsyncTimeoutError` exception will be raised.
##
## Note: `deadline` future will not be cancelled and/or failed.
##
## Note: While `waitUntil(future)` operation is pending, please avoid any
## attempts to cancel future `fut`. If it happens `waitUntil()` could
## introduce undefined behavior - it could raise`CancelledError` or
## `AsyncTimeoutError`.
##
## If you need to cancel `future` - cancel `waitUntil(future)` instead.
var
retFuture = newFuture[T]("chronos.wait(future)",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitUntilImpl(fut, retFuture, deadline)
proc join*(future: FutureBase): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
##
## This primitive helps to carefully monitor ``future`` state, in case of
## cancellation ``join`` operation it will not going to cancel ``future``.
##
## If ``future`` is already completed - ``join`` will return completed
## future immediately.
let retFuture = newFuture[void]("chronos.join()")
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
future.removeCallback(continuation, cast[pointer](retFuture))
if not(future.finished()):
future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
else:
retFuture.complete()
retFuture
proc join*(future: SomeFuture): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
##
## This primitive helps to carefully monitor ``future`` state, in case of
## cancellation ``join`` operation it will not going to cancel ``future``.
join(FutureBase(future))
when defined(windows):
import ../osdefs
@ -1678,10 +1855,9 @@ proc `or`*[T, Y, E1, E2](
fut1: InternalRaisesFuture[T, E1],
fut2: InternalRaisesFuture[Y, E2]): auto =
type
InternalRaisesFutureRaises = union(E1, E2)
InternalRaisesFutureRaises = union(E1, E2).union((CancelledError,))
let
retFuture = newFuture[void]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
let retFuture = newFuture[void]("chronos.or()", {})
orImpl(fut1, fut2)
proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
@ -1691,6 +1867,21 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)
let
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
retFuture = newFuture[T]("chronos.wait(duration)", {OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitImpl(fut, retFuture, timeout)
proc wait*(fut: InternalRaisesFuture, deadline: SomeFuture): auto =
type
T = type(fut).T
E = type(fut).E
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)
let
retFuture = newFuture[T]("chronos.wait(future)", {OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitUntilImpl(fut, retFuture, deadline)

View File

@ -219,12 +219,14 @@ proc decodeParams(params: NimNode): AsyncParams =
var
raw = false
raises: NimNode = nil
handleException = chronosHandleException
handleException = false
hasLocalAnnotations = false
for param in params:
param.expectKind(nnkExprColonExpr)
if param[0].eqIdent("raises"):
hasLocalAnnotations = true
param[1].expectKind(nnkBracket)
if param[1].len == 0:
raises = makeNoRaises()
@ -236,10 +238,14 @@ proc decodeParams(params: NimNode): AsyncParams =
# boolVal doesn't work in untyped macros it seems..
raw = param[1].eqIdent("true")
elif param[0].eqIdent("handleException"):
hasLocalAnnotations = true
handleException = param[1].eqIdent("true")
else:
warning("Unrecognised async parameter: " & repr(param[0]), param)
if not hasLocalAnnotations:
handleException = chronosHandleException
(raw, raises, handleException)
proc isEmpty(n: NimNode): bool {.compileTime.} =

View File

@ -2,6 +2,8 @@ import
std/[macros, sequtils],
../futures
{.push raises: [].}
type
InternalRaisesFuture*[T, E] = ref object of Future[T]
## Future with a tuple of possible exception types
@ -10,6 +12,12 @@ type
## This type gets injected by `async: (raises: ...)` and similar utilities
## and should not be used manually as the internal exception representation
## is subject to change in future chronos versions.
# TODO https://github.com/nim-lang/Nim/issues/23418
# TODO https://github.com/nim-lang/Nim/issues/23419
when E is void:
dummy: E
else:
dummy: array[0, E]
proc makeNoRaises*(): NimNode {.compileTime.} =
# An empty tuple would have been easier but...
@ -51,17 +59,32 @@ proc members(tup: NimNode): seq[NimNode] {.compileTime.} =
macro hasException(raises: typedesc, ident: static string): bool =
newLit(raises.members.anyIt(it.eqIdent(ident)))
macro Raising*[T](F: typedesc[Future[T]], E: varargs[typedesc]): untyped =
macro Raising*[T](F: typedesc[Future[T]], E: typed): untyped =
## Given a Future type instance, return a type storing `{.raises.}`
## information
##
## Note; this type may change in the future
E.expectKind(nnkBracket)
let raises = if E.len == 0:
# An earlier version used `E: varargs[typedesc]` here but this is buggyt/no
# longer supported in 2.0 in certain cases:
# https://github.com/nim-lang/Nim/issues/23432
let
e =
case E.getTypeInst().typeKind()
of ntyTypeDesc: @[E]
of ntyArray:
for x in E:
if x.getTypeInst().typeKind != ntyTypeDesc:
error("Expected typedesc, got " & repr(x), x)
E.mapIt(it)
else:
error("Expected typedesc, got " & repr(E), E)
@[]
let raises = if e.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(E.mapIt(it))
nnkTupleConstr.newTree(e)
nnkBracketExpr.newTree(
ident "InternalRaisesFuture",
nnkDotExpr.newTree(F, ident"T"),
@ -142,7 +165,7 @@ macro union*(tup0: typedesc, tup1: typedesc): typedesc =
if not found:
result.add err
for err2 in getType(getTypeInst(tup1)[1])[1..^1]:
for err2 in tup1.members():
result.add err2
if result.len == 0:
result = makeNoRaises()
@ -205,13 +228,20 @@ macro checkRaises*[T: CatchableError](
`warning`
assert(`runtimeChecker`, `errorMsg`)
proc error*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
func failed*[T](future: InternalRaisesFuture[T, void]): bool {.inline.} =
## Determines whether ``future`` finished with an error.
static:
warning("No exceptions possible with this operation, `failed` always returns false")
false
func error*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
raises: [].} =
static:
warning("No exceptions possible with this operation, `error` always returns nil")
nil
proc readError*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
func readError*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
raises: [ValueError].} =
static:
warning("No exceptions possible with this operation, `readError` always raises")

View File

@ -220,7 +220,10 @@ proc selectInto2*[T](s: Selector[T], timeout: int,
verifySelectParams(timeout, -1, int(high(cint)))
let
maxEventsCount = min(len(s.pollfds), len(readyKeys))
maxEventsCount = culong(min(len(s.pollfds), len(readyKeys)))
# Without `culong` conversion, this code could fail with RangeError
# defect on explicit Tnfds(integer) conversion (probably related to
# combination of nim+clang (android toolchain)).
eventsCount =
if maxEventsCount > 0:
let res = handleEintr(poll(addr(s.pollfds[0]), Tnfds(maxEventsCount),

View File

@ -965,7 +965,7 @@ elif defined(macos) or defined(macosx):
events*: cshort
revents*: cshort
Tnfds* {.importc: "nfds_t", header: "<poll.h>".} = cuint
Tnfds* {.importc: "nfds_t", header: "<poll.h>".} = culong
const
POLLIN* = 0x0001

View File

@ -9,7 +9,7 @@
{.push raises: [].}
import ../[config, asyncloop, asyncsync]
import ../[config, asyncloop, asyncsync, bipbuffer]
import ../transports/[common, stream]
export asyncloop, asyncsync, stream, common
@ -34,10 +34,11 @@ type
AsyncStreamWriteEOFError* = object of AsyncStreamWriteError
AsyncBuffer* = object
offset*: int
buffer*: seq[byte]
backend*: BipBuffer
events*: array[2, AsyncEvent]
AsyncBufferRef* = ref AsyncBuffer
WriteType* = enum
Pointer, Sequence, String
@ -73,7 +74,7 @@ type
tsource*: StreamTransport
readerLoop*: StreamReaderLoop
state*: AsyncStreamState
buffer*: AsyncBuffer
buffer*: AsyncBufferRef
udata: pointer
error*: ref AsyncStreamError
bytesCount*: uint64
@ -96,85 +97,51 @@ type
AsyncStreamRW* = AsyncStreamReader | AsyncStreamWriter
proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer =
AsyncBuffer(
buffer: newSeq[byte](size),
events: [newAsyncEvent(), newAsyncEvent()],
offset: 0
proc new*(t: typedesc[AsyncBufferRef], size: int): AsyncBufferRef =
AsyncBufferRef(
backend: BipBuffer.init(size),
events: [newAsyncEvent(), newAsyncEvent()]
)
proc getBuffer*(sb: AsyncBuffer): pointer {.inline.} =
unsafeAddr sb.buffer[sb.offset]
proc bufferLen*(sb: AsyncBuffer): int {.inline.} =
len(sb.buffer) - sb.offset
proc getData*(sb: AsyncBuffer): pointer {.inline.} =
unsafeAddr sb.buffer[0]
template dataLen*(sb: AsyncBuffer): int =
sb.offset
proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} =
doAssert(index < sb.offset)
sb.buffer[index]
proc update*(sb: var AsyncBuffer, size: int) {.inline.} =
sb.offset += size
template wait*(sb: var AsyncBuffer): untyped =
template wait*(sb: AsyncBufferRef): untyped =
sb.events[0].clear()
sb.events[1].fire()
sb.events[0].wait()
template transfer*(sb: var AsyncBuffer): untyped =
template transfer*(sb: AsyncBufferRef): untyped =
sb.events[1].clear()
sb.events[0].fire()
sb.events[1].wait()
proc forget*(sb: var AsyncBuffer) {.inline.} =
proc forget*(sb: AsyncBufferRef) {.inline.} =
sb.events[1].clear()
sb.events[0].fire()
proc shift*(sb: var AsyncBuffer, size: int) {.inline.} =
if sb.offset > size:
moveMem(addr sb.buffer[0], addr sb.buffer[size], sb.offset - size)
sb.offset = sb.offset - size
else:
sb.offset = 0
proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} =
copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)),
unsafeAddr sb.buffer[0], length)
proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte,
proc upload*(sb: AsyncBufferRef, pbytes: ptr byte,
nbytes: int): Future[void] {.
async: (raises: [CancelledError]).} =
## You can upload any amount of bytes to the buffer. If size of internal
## buffer is not enough to fit all the data at once, data will be uploaded
## via chunks of size up to internal buffer size.
var length = nbytes
var srcBuffer = cast[ptr UncheckedArray[byte]](pbytes)
var srcOffset = 0
var
length = nbytes
srcBuffer = pbytes.toUnchecked()
offset = 0
while length > 0:
let size = min(length, sb[].bufferLen())
let size = min(length, sb.backend.availSpace())
if size == 0:
# Internal buffer is full, we need to transfer data to consumer.
await sb[].transfer()
# Internal buffer is full, we need to notify consumer.
await sb.transfer()
else:
let (data, _) = sb.backend.reserve()
# Copy data from `pbytes` to internal buffer.
copyMem(addr sb[].buffer[sb.offset], addr srcBuffer[srcOffset], size)
sb[].offset = sb[].offset + size
srcOffset = srcOffset + size
copyMem(data, addr srcBuffer[offset], size)
sb.backend.commit(size)
offset = offset + size
length = length - size
# We notify consumers that new data is available.
sb[].forget()
template toDataOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, 0, sb.offset - 1)
template toBufferOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, sb.offset, len(sb.buffer) - 1)
sb.forget()
template copyOut*(dest: pointer, item: WriteItem, length: int) =
if item.kind == Pointer:
@ -243,7 +210,7 @@ proc atEof*(rstream: AsyncStreamReader): bool =
rstream.rsource.atEof()
else:
(rstream.state != AsyncStreamState.Running) and
(rstream.buffer.dataLen() == 0)
(len(rstream.buffer.backend) == 0)
proc atEof*(wstream: AsyncStreamWriter): bool =
## Returns ``true`` is writing stream ``wstream`` closed or finished.
@ -331,12 +298,12 @@ template checkStreamFinished*(t: untyped) =
template readLoop(body: untyped): untyped =
while true:
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
if rstream.state == AsyncStreamState.Error:
raise rstream.error
let (consumed, done) = body
rstream.buffer.shift(consumed)
rstream.buffer.backend.consume(consumed)
rstream.bytesCount = rstream.bytesCount + uint64(consumed)
if done:
break
@ -350,7 +317,7 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
## it to ``pbytes``.
##
## If EOF is received and ``nbytes`` is not yet readed, the procedure
## If EOF is received and ``nbytes`` is not yet read, the procedure
## will raise ``AsyncStreamIncompleteError``.
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(nbytes >= 0, "nbytes must be non-negative integer")
@ -373,17 +340,23 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.readerLoop):
await readExactly(rstream.rsource, pbytes, nbytes)
else:
var index = 0
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var
index = 0
pbuffer = pbytes.toUnchecked()
readLoop():
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
if rstream.atEof():
raise newAsyncStreamIncompleteError()
let count = min(nbytes - index, rstream.buffer.dataLen())
if count > 0:
rstream.buffer.copyData(addr pbuffer[index], 0, count)
index += count
(consumed: count, done: index == nbytes)
var bytesRead = 0
for (region, rsize) in rstream.buffer.backend.regions():
let count = min(nbytes - index, rsize)
bytesRead += count
if count > 0:
copyMem(addr pbuffer[index], region, count)
index += count
if index == nbytes:
break
(consumed: bytesRead, done: index == nbytes)
proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int): Future[int] {.
@ -407,15 +380,21 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.readerLoop):
return await readOnce(rstream.rsource, pbytes, nbytes)
else:
var count = 0
var
pbuffer = pbytes.toUnchecked()
index = 0
readLoop():
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
(0, rstream.atEof())
else:
count = min(rstream.buffer.dataLen(), nbytes)
rstream.buffer.copyData(pbytes, 0, count)
(count, true)
return count
for (region, rsize) in rstream.buffer.backend.regions():
let size = min(rsize, nbytes - index)
copyMem(addr pbuffer[index], region, size)
index += size
if index >= nbytes:
break
(index, true)
index
proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.
@ -456,28 +435,32 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
if isNil(rstream.readerLoop):
return await readUntil(rstream.rsource, pbytes, nbytes, sep)
else:
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var state = 0
var k = 0
var
pbuffer = pbytes.toUnchecked()
state = 0
k = 0
readLoop():
if rstream.atEof():
raise newAsyncStreamIncompleteError()
var index = 0
while index < rstream.buffer.dataLen():
for ch in rstream.buffer.backend:
if k >= nbytes:
raise newAsyncStreamLimitError()
let ch = rstream.buffer[index]
inc(index)
pbuffer[k] = ch
inc(k)
if sep[state] == ch:
inc(state)
if state == len(sep):
break
else:
state = 0
(index, state == len(sep))
return k
k
proc readLine*(rstream: AsyncStreamReader, limit = 0,
sep = "\r\n"): Future[string] {.
@ -507,18 +490,19 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
return await readLine(rstream.rsource, limit, sep)
else:
let lim = if limit <= 0: -1 else: limit
var state = 0
var res = ""
var
state = 0
res = ""
readLoop():
if rstream.atEof():
(0, true)
else:
var index = 0
while index < rstream.buffer.dataLen():
let ch = char(rstream.buffer[index])
for ch in rstream.buffer.backend:
inc(index)
if sep[state] == ch:
if sep[state] == char(ch):
inc(state)
if state == len(sep):
break
@ -529,11 +513,14 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
res.add(sep[0 ..< missing])
else:
res.add(sep[0 ..< state])
res.add(ch)
state = 0
res.add(char(ch))
if len(res) == lim:
break
(index, (state == len(sep)) or (lim == len(res)))
return res
res
proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -555,15 +542,17 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.
if isNil(rstream.readerLoop):
return await read(rstream.rsource)
else:
var res = newSeq[byte]()
var res: seq[byte]
readLoop():
if rstream.atEof():
(0, true)
else:
let count = rstream.buffer.dataLen()
res.add(rstream.buffer.buffer.toOpenArray(0, count - 1))
(count, false)
return res
var bytesRead = 0
for (region, rsize) in rstream.buffer.backend.regions():
bytesRead += rsize
res.add(region.toUnchecked().toOpenArray(0, rsize - 1))
(bytesRead, false)
res
proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -592,10 +581,13 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.
if rstream.atEof():
(0, true)
else:
let count = min(rstream.buffer.dataLen(), n - len(res))
res.add(rstream.buffer.buffer.toOpenArray(0, count - 1))
(count, len(res) == n)
return res
var bytesRead = 0
for (region, rsize) in rstream.buffer.backend.regions():
let count = min(rsize, n - len(res))
bytesRead += count
res.add(region.toUnchecked().toOpenArray(0, count - 1))
(bytesRead, len(res) == n)
res
proc consume*(rstream: AsyncStreamReader): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -622,9 +614,10 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.
if rstream.atEof():
(0, true)
else:
res += rstream.buffer.dataLen()
(rstream.buffer.dataLen(), false)
return res
let used = len(rstream.buffer.backend)
res += used
(used, false)
res
proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -652,13 +645,12 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.
else:
var res = 0
readLoop():
if rstream.atEof():
(0, true)
else:
let count = min(rstream.buffer.dataLen(), n - res)
res += count
(count, res == n)
return res
let
used = len(rstream.buffer.backend)
count = min(used, n - res)
res += count
(count, res == n)
res
proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -689,15 +681,18 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
await readMessage(rstream.rsource, pred)
else:
readLoop():
let count = rstream.buffer.dataLen()
if count == 0:
if len(rstream.buffer.backend) == 0:
if rstream.atEof():
pred([])
else:
# Case, when transport's buffer is not yet filled with data.
(0, false)
else:
pred(rstream.buffer.buffer.toOpenArray(0, count - 1))
var res: tuple[consumed: int, done: bool]
for (region, rsize) in rstream.buffer.backend.regions():
res = pred(region.toUnchecked().toOpenArray(0, rsize - 1))
break
res
proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
nbytes: int) {.
@ -736,7 +731,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
msglen = -1) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
@ -771,14 +766,14 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
wstream.bytesCount = wstream.bytesCount + uint64(length)
else:
let item = WriteItem(
kind: Sequence, dataSeq: move(sbytes), size: length,
kind: Sequence, dataSeq: sbytes, size: length,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.write(seq)"))
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
proc write*(wstream: AsyncStreamWriter, sbytes: string,
msglen = -1) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
@ -812,7 +807,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
wstream.bytesCount = wstream.bytesCount + uint64(length)
else:
let item = WriteItem(
kind: String, dataStr: move(sbytes), size: length,
kind: String, dataStr: sbytes, size: length,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.write(string)"))
await wstream.queue.put(item)
@ -841,24 +836,7 @@ proc join*(rw: AsyncStreamRW): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Get Future[void] which will be completed when stream become finished or
## closed.
when rw is AsyncStreamReader:
var retFuture = newFuture[void]("async.stream.reader.join")
else:
var retFuture = newFuture[void]("async.stream.writer.join")
proc continuation(udata: pointer) {.gcsafe, raises:[].} =
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe, raises:[].} =
rw.future.removeCallback(continuation, cast[pointer](retFuture))
if not(rw.future.finished()):
rw.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
else:
retFuture.complete()
return retFuture
rw.future.join()
proc close*(rw: AsyncStreamRW) =
## Close and frees resources of stream ``rw``.
@ -951,7 +929,8 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBuffer.init(bufferSize)
let size = max(AsyncStreamDefaultBufferSize, bufferSize)
child.buffer = AsyncBufferRef.new(size)
trackCounter(AsyncStreamReaderTrackerName)
child.startReader()
@ -963,7 +942,8 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBuffer.init(bufferSize)
let size = max(AsyncStreamDefaultBufferSize, bufferSize)
child.buffer = AsyncBufferRef.new(size)
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
@ -1102,6 +1082,22 @@ proc newAsyncStreamReader*(tsource: StreamTransport): AsyncStreamReader =
res.init(tsource)
res
proc newAsyncStreamReader*[T](rsource: AsyncStreamReader,
udata: ref T): AsyncStreamReader =
## Create copy of AsyncStreamReader object ``rsource``.
##
## ``udata`` - user object which will be associated with new AsyncStreamReader
## object.
var res = AsyncStreamReader()
res.init(rsource, udata)
res
proc newAsyncStreamReader*(rsource: AsyncStreamReader): AsyncStreamReader =
## Create copy of AsyncStreamReader object ``rsource``.
var res = AsyncStreamReader()
res.init(rsource)
res
proc newAsyncStreamWriter*[T](wsource: AsyncStreamWriter,
loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize,
@ -1167,22 +1163,6 @@ proc newAsyncStreamWriter*(wsource: AsyncStreamWriter): AsyncStreamWriter =
res.init(wsource)
res
proc newAsyncStreamReader*[T](rsource: AsyncStreamWriter,
udata: ref T): AsyncStreamWriter =
## Create copy of AsyncStreamReader object ``rsource``.
##
## ``udata`` - user object which will be associated with new AsyncStreamReader
## object.
var res = AsyncStreamReader()
res.init(rsource, udata)
res
proc newAsyncStreamReader*(rsource: AsyncStreamReader): AsyncStreamReader =
## Create copy of AsyncStreamReader object ``rsource``.
var res = AsyncStreamReader()
res.init(rsource)
res
proc getUserData*[T](rw: AsyncStreamRW): T {.inline.} =
## Obtain user data associated with AsyncStreamReader or AsyncStreamWriter
## object ``rw``.

View File

@ -18,8 +18,8 @@
{.push raises: [].}
import results
import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common
import ../[asyncloop, timer, bipbuffer, config]
import asyncstream, ../transports/[stream, common]
export asyncloop, asyncstream, stream, timer, common
type
@ -44,7 +44,7 @@ type
BoundedStreamRW* = BoundedStreamReader | BoundedStreamWriter
const
BoundedBufferSize* = 4096
BoundedBufferSize* = chronosStreamDefaultBufferSize
BoundarySizeDefectMessage = "Boundary must not be empty array"
template newBoundedStreamIncompleteError(): ref BoundedStreamError =
@ -103,7 +103,7 @@ func endsWith(s, suffix: openArray[byte]): bool =
proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var rstream = BoundedStreamReader(stream)
rstream.state = AsyncStreamState.Running
var buffer = newSeq[byte](rstream.buffer.bufferLen())
var buffer = newSeq[byte](rstream.buffer.backend.availSpace())
while true:
let toRead =
if rstream.boundSize.isNone():
@ -127,7 +127,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], length)
await upload(rstream.buffer, addr buffer[0], length)
if rstream.state == AsyncStreamState.Running:
rstream.state = AsyncStreamState.Finished
else:
@ -135,7 +135,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
await upload(rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
@ -151,7 +151,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
await upload(rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop

View File

@ -11,13 +11,13 @@
{.push raises: [].}
import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common
import ../[asyncloop, timer, bipbuffer, config]
import asyncstream, ../transports/[stream, common]
import results
export asyncloop, asyncstream, stream, timer, common, results
const
ChunkBufferSize = 4096
ChunkBufferSize = chronosStreamDefaultBufferSize
MaxChunkHeaderSize = 1024
ChunkHeaderValueSize = 8
# This is limit for chunk size to 8 hexadecimal digits, so maximum
@ -118,11 +118,11 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var chunksize = cres.get()
if chunksize > 0'u64:
while chunksize > 0'u64:
let toRead = int(min(chunksize,
uint64(rstream.buffer.bufferLen())))
await rstream.rsource.readExactly(rstream.buffer.getBuffer(),
toRead)
rstream.buffer.update(toRead)
let
(data, rsize) = rstream.buffer.backend.reserve()
toRead = int(min(chunksize, uint64(rsize)))
await rstream.rsource.readExactly(data, toRead)
rstream.buffer.backend.commit(toRead)
await rstream.buffer.transfer()
chunksize = chunksize - uint64(toRead)

View File

@ -16,9 +16,12 @@ import
bearssl/[brssl, ec, errors, pem, rsa, ssl, x509],
bearssl/certs/cacert
import ".."/[asyncloop, asyncsync, config, timer]
import asyncstream, ../transports/stream, ../transports/common
import asyncstream, ../transports/[stream, common]
export asyncloop, asyncsync, timer, asyncstream
const
TLSSessionCacheBufferSize* = chronosTLSSessionCacheBufferSize
type
TLSStreamKind {.pure.} = enum
Client, Server
@ -158,7 +161,7 @@ proc tlsWriteRec(engine: ptr SslEngineContext,
var length = 0'u
var buf = sslEngineSendrecBuf(engine[], length)
doAssert(length != 0 and not isNil(buf))
await writer.wsource.write(chronosMoveSink(buf), int(length))
await writer.wsource.write(buf, int(length))
sslEngineSendrecAck(engine[], length)
TLSResult.Success
except AsyncStreamError as exc:
@ -239,7 +242,7 @@ proc tlsReadApp(engine: ptr SslEngineContext,
try:
var length = 0'u
var buf = sslEngineRecvappBuf(engine[], length)
await upload(addr reader.buffer, buf, int(length))
await upload(reader.buffer, buf, int(length))
sslEngineRecvappAck(engine[], length)
TLSResult.Success
except CancelledError:
@ -507,8 +510,10 @@ proc newTLSClientAsyncStream*(
if TLSFlags.NoVerifyHost in flags:
sslClientInitFull(res.ccontext, addr res.x509, nil, 0)
x509NoanchorInit(res.xwc, addr res.x509.vtable)
sslEngineSetX509(res.ccontext.eng, addr res.xwc.vtable)
x509NoanchorInit(res.xwc,
X509ClassPointerConst(addr res.x509.vtable))
sslEngineSetX509(res.ccontext.eng,
X509ClassPointerConst(addr res.xwc.vtable))
else:
when trustAnchors is TrustAnchorStore:
res.trustAnchors = trustAnchors
@ -608,7 +613,8 @@ proc newTLSServerAsyncStream*(rsource: AsyncStreamReader,
uint16(maxVersion))
if not isNil(cache):
sslServerSetCache(res.scontext, addr cache.context.vtable)
sslServerSetCache(
res.scontext, SslSessionCacheClassPointerConst(addr cache.context.vtable))
if TLSFlags.EnforceServerPref in flags:
sslEngineAddFlags(res.scontext.eng, OPT_ENFORCE_SERVER_PREFERENCES)
@ -777,11 +783,12 @@ proc init*(tt: typedesc[TLSCertificate],
raiseTLSStreamProtocolError("Could not find any certificates")
res
proc init*(tt: typedesc[TLSSessionCache], size: int = 4096): TLSSessionCache =
proc init*(tt: typedesc[TLSSessionCache],
size: int = TLSSessionCacheBufferSize): TLSSessionCache =
## Create new TLS session cache with size ``size``.
##
## One cached item is near 100 bytes size.
var rsize = min(size, 4096)
let rsize = min(size, 4096)
var res = TLSSessionCache(storage: newSeq[byte](rsize))
sslSessionCacheLruInit(addr res.context, addr res.storage[0], rsize)
res

View File

@ -370,53 +370,42 @@ template add(a: var string, b: Base10Buf[uint64]) =
for index in 0 ..< b.len:
a.add(char(b.data[index]))
func `$`*(a: Duration): string {.inline.} =
## Returns string representation of Duration ``a`` as nanoseconds value.
var res = ""
var v = a.value
func toString*(a: timer.Duration, parts = int.high): string =
## Returns a pretty string representation of Duration ``a`` - the
## number of parts returned can be limited thus truncating the output to
## an approximation that grows more precise as the duration becomes smaller
var
res = newStringOfCap(32)
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if parts <= 0:
return res
if v >= T.nanoseconds():
res.add(Base10.toBytes(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0:
return res
f("w", Week)
f("d", Day)
f("h", Hour)
f("m", Minute)
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
if v >= Week.value:
res.add(Base10.toBytes(uint64(v div Week.value)))
res.add('w')
v = v mod Week.value
if v == 0: return res
if v >= Day.value:
res.add(Base10.toBytes(uint64(v div Day.value)))
res.add('d')
v = v mod Day.value
if v == 0: return res
if v >= Hour.value:
res.add(Base10.toBytes(uint64(v div Hour.value)))
res.add('h')
v = v mod Hour.value
if v == 0: return res
if v >= Minute.value:
res.add(Base10.toBytes(uint64(v div Minute.value)))
res.add('m')
v = v mod Minute.value
if v == 0: return res
if v >= Second.value:
res.add(Base10.toBytes(uint64(v div Second.value)))
res.add('s')
v = v mod Second.value
if v == 0: return res
if v >= Millisecond.value:
res.add(Base10.toBytes(uint64(v div Millisecond.value)))
res.add('m')
res.add('s')
v = v mod Millisecond.value
if v == 0: return res
if v >= Microsecond.value:
res.add(Base10.toBytes(uint64(v div Microsecond.value)))
res.add('u')
res.add('s')
v = v mod Microsecond.value
if v == 0: return res
res.add(Base10.toBytes(uint64(v div Nanosecond.value)))
res.add('n')
res.add('s')
res
func `$`*(a: Duration): string {.inline.} =
## Returns string representation of Duration ``a``.
a.toString()
func `$`*(a: Moment): string {.inline.} =
## Returns string representation of Moment ``a`` as nanoseconds value.
var res = ""

View File

@ -10,6 +10,7 @@
{.push raises: [].}
import std/[strutils]
import results
import stew/[base10, byteutils]
import ".."/[config, asyncloop, osdefs, oserrno, handles]
@ -18,7 +19,7 @@ from std/net import Domain, `==`, IpAddress, IpAddressFamily, parseIpAddress,
from std/nativesockets import toInt, `$`
export Domain, `==`, IpAddress, IpAddressFamily, parseIpAddress, SockType,
Protocol, Port, toInt, `$`
Protocol, Port, toInt, `$`, results
const
DefaultStreamBufferSize* = chronosTransportDefaultBufferSize
@ -29,7 +30,7 @@ type
ServerFlags* = enum
## Server's flags
ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe,
NoPipeFlash, Broadcast
NoPipeFlash, Broadcast, V4Mapped
DualStackType* {.pure.} = enum
Auto, Enabled, Disabled, Default
@ -200,6 +201,15 @@ proc `$`*(address: TransportAddress): string =
of AddressFamily.None:
"None"
proc toIpAddress*(address: TransportAddress): IpAddress =
case address.family
of AddressFamily.IPv4:
IpAddress(family: IpAddressFamily.IPv4, address_v4: address.address_v4)
of AddressFamily.IPv6:
IpAddress(family: IpAddressFamily.IPv6, address_v6: address.address_v6)
else:
raiseAssert "IpAddress do not support address family " & $address.family
proc toHex*(address: TransportAddress): string =
## Returns hexadecimal representation of ``address``.
case address.family
@ -783,3 +793,25 @@ proc setDualstack*(socket: AsyncFD,
else:
? getDomain(socket)
setDualstack(socket, family, flag)
proc getAutoAddress*(port: Port): TransportAddress =
var res =
if isAvailable(AddressFamily.IPv6):
AnyAddress6
else:
AnyAddress
res.port = port
res
proc getAutoAddresses*(
localPort: Port,
remotePort: Port
): tuple[local: TransportAddress, remote: TransportAddress] =
var (local, remote) =
if isAvailable(AddressFamily.IPv6):
(AnyAddress6, AnyAddress6)
else:
(AnyAddress, AnyAddress)
local.port = localPort
remote.port = remotePort
(local, remote)

View File

@ -10,11 +10,14 @@
{.push raises: [].}
import std/deques
import results
when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles]
import "."/common
import ".."/[asyncloop, osdefs, oserrno, osutils, handles]
import "."/[common, ipnet]
import stew/ptrops
export results
type
VectorKind = enum
WithoutAddress, WithAddress
@ -60,29 +63,78 @@ type
const
DgramTransportTrackerName* = "datagram.transport"
proc getRemoteAddress(transp: DatagramTransport,
address: Sockaddr_storage, length: SockLen,
): TransportAddress =
var raddr: TransportAddress
fromSAddr(unsafeAddr address, length, raddr)
if ServerFlags.V4Mapped in transp.flags:
if raddr.isV4Mapped(): raddr.toIPv4() else: raddr
else:
raddr
proc getRemoteAddress(transp: DatagramTransport): TransportAddress =
transp.getRemoteAddress(transp.raddr, transp.ralen)
proc setRemoteAddress(transp: DatagramTransport,
address: TransportAddress): TransportAddress =
let
fixedAddress =
when defined(windows):
windowsAnyAddressFix(address)
else:
address
remoteAddress =
if ServerFlags.V4Mapped in transp.flags:
if address.family == AddressFamily.IPv4:
fixedAddress.toIPv6()
else:
fixedAddress
else:
fixedAddress
toSAddr(remoteAddress, transp.waddr, transp.walen)
remoteAddress
proc remoteAddress2*(
transp: DatagramTransport
): Result[TransportAddress, OSErrorCode] =
## Returns ``transp`` remote socket address.
if transp.remote.family == AddressFamily.None:
var
saddr: Sockaddr_storage
slen = SockLen(sizeof(saddr))
if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
return err(osLastError())
transp.remote = transp.getRemoteAddress(saddr, slen)
ok(transp.remote)
proc localAddress2*(
transp: DatagramTransport
): Result[TransportAddress, OSErrorCode] =
## Returns ``transp`` local socket address.
if transp.local.family == AddressFamily.None:
var
saddr: Sockaddr_storage
slen = SockLen(sizeof(saddr))
if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
return err(osLastError())
fromSAddr(addr saddr, slen, transp.local)
ok(transp.local)
func toException(v: OSErrorCode): ref TransportOsError =
getTransportOsError(v)
proc remoteAddress*(transp: DatagramTransport): TransportAddress {.
raises: [TransportOsError].} =
## Returns ``transp`` remote socket address.
if transp.remote.family == AddressFamily.None:
var saddr: Sockaddr_storage
var slen = SockLen(sizeof(saddr))
if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.remote)
transp.remote
remoteAddress2(transp).tryGet()
proc localAddress*(transp: DatagramTransport): TransportAddress {.
raises: [TransportOsError].} =
## Returns ``transp`` local socket address.
if transp.local.family == AddressFamily.None:
var saddr: Sockaddr_storage
var slen = SockLen(sizeof(saddr))
if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.local)
transp.local
## Returns ``transp`` remote socket address.
localAddress2(transp).tryGet()
template setReadError(t, e: untyped) =
(t).state.incl(ReadError)
@ -124,8 +176,8 @@ when defined(windows):
transp.setWriterWSABuffer(vector)
let ret =
if vector.kind == WithAddress:
var fixedAddress = windowsAnyAddressFix(vector.address)
toSAddr(fixedAddress, transp.waddr, transp.walen)
# We only need `Sockaddr_storage` data here, so result discarded.
discard transp.setRemoteAddress(vector.address)
wsaSendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
cint(transp.walen),
@ -159,22 +211,24 @@ when defined(windows):
proc readDatagramLoop(udata: pointer) =
var
bytesCount: uint32
raddr: TransportAddress
var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[DatagramTransport](ovl.data.udata)
ovl = cast[PtrCustomOverlapped](udata)
let transp = cast[DatagramTransport](ovl.data.udata)
while true:
if ReadPending in transp.state:
## Continuation
transp.state.excl(ReadPending)
let err = transp.rovl.data.errCode
let
err = transp.rovl.data.errCode
remoteAddress = transp.getRemoteAddress()
case err
of OSErrorCode(-1):
let bytesCount = transp.rovl.data.bytesCount
if bytesCount == 0:
transp.state.incl({ReadEof, ReadPaused})
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = int(bytesCount)
asyncSpawn transp.function(transp, raddr)
asyncSpawn transp.function(transp, remoteAddress)
of ERROR_OPERATION_ABORTED:
# CancelIO() interrupt or closeSocket() call.
transp.state.incl(ReadPaused)
@ -189,7 +243,7 @@ when defined(windows):
transp.setReadError(err)
transp.state.incl(ReadPaused)
transp.buflen = 0
asyncSpawn transp.function(transp, raddr)
asyncSpawn transp.function(transp, remoteAddress)
else:
## Initiation
if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
@ -220,7 +274,7 @@ when defined(windows):
transp.state.incl(ReadPaused)
transp.setReadError(err)
transp.buflen = 0
asyncSpawn transp.function(transp, raddr)
asyncSpawn transp.function(transp, transp.getRemoteAddress())
else:
# Transport closure happens in callback, and we not started new
# WSARecvFrom session.
@ -341,18 +395,25 @@ when defined(windows):
closeSocket(localSock)
raiseTransportOsError(err)
res.flags =
block:
# Add `V4Mapped` flag when `::` address is used and dualstack is
# set to enabled or auto.
var res = flags
if (local.family == AddressFamily.IPv6) and local.isAnyLocal():
if dualstack in {DualStackType.Enabled, DualStackType.Auto}:
res.incl(ServerFlags.V4Mapped)
res
if remote.port != Port(0):
var fixedAddress = windowsAnyAddressFix(remote)
var saddr: Sockaddr_storage
var slen: SockLen
toSAddr(fixedAddress, saddr, slen)
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let remoteAddress = res.setRemoteAddress(remote)
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr res.waddr),
res.walen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
res.remote = fixedAddress
res.remote = remoteAddress
res.fd = localSock
res.function = cbproc
@ -362,12 +423,12 @@ when defined(windows):
res.state = {ReadPaused, WritePaused}
res.future = Future[void].Raising([]).init(
"datagram.transport", {FutureFlag.OwnCancelSchedule})
res.rovl.data = CompletionData(cb: readDatagramLoop,
udata: cast[pointer](res))
res.wovl.data = CompletionData(cb: writeDatagramLoop,
udata: cast[pointer](res))
res.rwsabuf = WSABUF(buf: cast[cstring](baseAddr res.buffer),
len: ULONG(len(res.buffer)))
res.rovl.data = CompletionData(
cb: readDatagramLoop, udata: cast[pointer](res))
res.wovl.data = CompletionData(
cb: writeDatagramLoop, udata: cast[pointer](res))
res.rwsabuf = WSABUF(
buf: cast[cstring](baseAddr res.buffer), len: ULONG(len(res.buffer)))
GC_ref(res)
# Start tracking transport
trackCounter(DgramTransportTrackerName)
@ -380,10 +441,10 @@ else:
# Linux/BSD/MacOS part
proc readDatagramLoop(udata: pointer) {.raises: [].}=
var raddr: TransportAddress
doAssert(not isNil(udata))
let transp = cast[DatagramTransport](udata)
let fd = SocketHandle(transp.fd)
let
transp = cast[DatagramTransport](udata)
fd = SocketHandle(transp.fd)
if int(fd) == 0:
## This situation can be happen, when there events present
## after transport was closed.
@ -398,9 +459,8 @@ else:
cast[ptr SockAddr](addr transp.raddr),
addr transp.ralen)
if res >= 0:
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = res
asyncSpawn transp.function(transp, raddr)
asyncSpawn transp.function(transp, transp.getRemoteAddress())
else:
let err = osLastError()
case err
@ -409,14 +469,15 @@ else:
else:
transp.buflen = 0
transp.setReadError(err)
asyncSpawn transp.function(transp, raddr)
asyncSpawn transp.function(transp, transp.getRemoteAddress())
break
proc writeDatagramLoop(udata: pointer) =
var res: int
doAssert(not isNil(udata))
var transp = cast[DatagramTransport](udata)
let fd = SocketHandle(transp.fd)
let
transp = cast[DatagramTransport](udata)
fd = SocketHandle(transp.fd)
if int(fd) == 0:
## This situation can be happen, when there events present
## after transport was closed.
@ -428,7 +489,8 @@ else:
let vector = transp.queue.popFirst()
while true:
if vector.kind == WithAddress:
toSAddr(vector.address, transp.waddr, transp.walen)
# We only need `Sockaddr_storage` data here, so result discarded.
discard transp.setRemoteAddress(vector.address)
res = osdefs.sendto(fd, vector.buf, vector.buflen, MSG_NOSIGNAL,
cast[ptr SockAddr](addr transp.waddr),
transp.walen)
@ -551,21 +613,28 @@ else:
closeSocket(localSock)
raiseTransportOsError(err)
res.flags =
block:
# Add `V4Mapped` flag when `::` address is used and dualstack is
# set to enabled or auto.
var res = flags
if (local.family == AddressFamily.IPv6) and local.isAnyLocal():
if dualstack != DualStackType.Disabled:
res.incl(ServerFlags.V4Mapped)
res
if remote.port != Port(0):
var saddr: Sockaddr_storage
var slen: SockLen
toSAddr(remote, saddr, slen)
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let remoteAddress = res.setRemoteAddress(remote)
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr res.waddr),
res.walen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
res.remote = remote
res.remote = remoteAddress
res.fd = localSock
res.function = cbproc
res.flags = flags
res.buffer = newSeq[byte](bufferSize)
res.queue = initDeque[GramVector]()
res.udata = udata
@ -605,6 +674,24 @@ proc close*(transp: DatagramTransport) =
transp.state.incl({WriteClosed, ReadClosed})
closeSocket(transp.fd, continuation)
proc getTransportAddresses(
local, remote: Opt[IpAddress],
localPort, remotePort: Port
): tuple[local: TransportAddress, remote: TransportAddress] =
let
(localAuto, remoteAuto) = getAutoAddresses(localPort, remotePort)
lres =
if local.isSome():
initTAddress(local.get(), localPort)
else:
localAuto
rres =
if remote.isSome():
initTAddress(remote.get(), remotePort)
else:
remoteAuto
(lres, rres)
proc newDatagramTransportCommon(cbproc: UnsafeDatagramCallback,
remote: TransportAddress,
local: TransportAddress,
@ -633,7 +720,7 @@ proc newDatagramTransportCommon(cbproc: UnsafeDatagramCallback,
proc wrap(transp: DatagramTransport,
remote: TransportAddress) {.async: (raises: []).} =
try:
cbproc(transp, remote)
await cbproc(transp, remote)
except CatchableError as exc:
raiseAssert "Unexpected exception from stream server cbproc: " & exc.msg
@ -824,24 +911,96 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback,
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc newDatagramTransport*(cbproc: DatagramCallback,
localPort: Port,
remotePort: Port,
local: Opt[IpAddress] = Opt.none(IpAddress),
remote: Opt[IpAddress] = Opt.none(IpAddress),
flags: set[ServerFlags] = {},
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
## Create new UDP datagram transport (IPv6) and bind it to ANY_ADDRESS.
## Depending on OS settings procedure perform an attempt to create transport
## using IPv6 ANY_ADDRESS, if its not available it will try to bind transport
## to IPv4 ANY_ADDRESS.
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``localPort`` - local peer's port number.
## ``remotePort`` - remote peer's port number.
## ``local`` - optional local peer's IPv4/IPv6 address.
## ``remote`` - optional remote peer's IPv4/IPv6 address.
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
let
(localHost, remoteHost) =
getTransportAddresses(local, remote, localPort, remotePort)
newDatagramTransportCommon(cbproc, remoteHost, localHost, asyncInvalidSocket,
flags, cast[pointer](udata), child, bufSize,
ttl, dualstack)
proc newDatagramTransport*(cbproc: DatagramCallback,
localPort: Port,
local: Opt[IpAddress] = Opt.none(IpAddress),
flags: set[ServerFlags] = {},
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
newDatagramTransport(cbproc, localPort, Port(0), local, Opt.none(IpAddress),
flags, udata, child, bufSize, ttl, dualstack)
proc newDatagramTransport*[T](cbproc: DatagramCallback,
localPort: Port,
remotePort: Port,
local: Opt[IpAddress] = Opt.none(IpAddress),
remote: Opt[IpAddress] = Opt.none(IpAddress),
flags: set[ServerFlags] = {},
udata: ref T,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
let
(localHost, remoteHost) =
getTransportAddresses(local, remote, localPort, remotePort)
fflags = flags + {GCUserData}
GC_ref(udata)
newDatagramTransportCommon(cbproc, remoteHost, localHost, asyncInvalidSocket,
fflags, cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc newDatagramTransport*[T](cbproc: DatagramCallback,
localPort: Port,
local: Opt[IpAddress] = Opt.none(IpAddress),
flags: set[ServerFlags] = {},
udata: ref T,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
newDatagramTransport(cbproc, localPort, Port(0), local, Opt.none(IpAddress),
flags, udata, child, bufSize, ttl, dualstack)
proc join*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until the transport ``transp`` will be closed.
let retFuture = newFuture[void]("datagram.transport.join")
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
transp.future.removeCallback(continuation, cast[pointer](retFuture))
if not(transp.future.finished()):
transp.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
transp.future.join()
proc closed*(transp: DatagramTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
@ -873,7 +1032,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc send*(transp: DatagramTransport, msg: sink string,
proc send*(transp: DatagramTransport, msg: string,
msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
@ -882,7 +1041,7 @@ proc send*(transp: DatagramTransport, msg: sink string,
transp.checkClosed(retFuture)
let length = if msglen <= 0: len(msg) else: msglen
var localCopy = chronosMoveSink(msg)
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy,
@ -896,16 +1055,16 @@ proc send*(transp: DatagramTransport, msg: sink string,
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc send*[T](transp: DatagramTransport, msg: sink seq[T],
proc send*[T](transp: DatagramTransport, msg: seq[T],
msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
let retFuture = newFuture[void]("datagram.transport.send(seq)")
transp.checkClosed(retFuture)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = chronosMoveSink(msg)
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy,
@ -935,7 +1094,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
return retFuture
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
msg: sink string, msglen = -1): Future[void] {.
msg: string, msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
@ -943,7 +1102,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
transp.checkClosed(retFuture)
let length = if msglen <= 0: len(msg) else: msglen
var localCopy = chronosMoveSink(msg)
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy,
@ -958,14 +1117,14 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
return retFuture
proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
msg: sink seq[T], msglen = -1): Future[void] {.
msg: seq[T], msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send sequence ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
let retFuture = newFuture[void]("datagram.transport.sendTo(seq)")
transp.checkClosed(retFuture)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = chronosMoveSink(msg)
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy,

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,9 @@ Async/await is a programming model that relies on cooperative multitasking to
coordinate the concurrent execution of procedures, using event notifications
from the operating system or other treads to resume execution.
Code execution happens in a loop that alternates between making progress on
tasks and handling events.
<!-- toc -->
## The dispatcher
@ -118,7 +121,8 @@ The `CancelledError` will now travel up the stack like any other exception.
It can be caught for instance to free some resources and is then typically
re-raised for the whole chain operations to get cancelled.
Alternatively, the cancellation request can be translated to a regular outcome of the operation - for example, a `read` operation might return an empty result.
Alternatively, the cancellation request can be translated to a regular outcome
of the operation - for example, a `read` operation might return an empty result.
Cancelling an already-finished `Future` has no effect, as the following example
of downloading two web pages concurrently shows:
@ -127,8 +131,84 @@ of downloading two web pages concurrently shows:
{{#include ../examples/twogets.nim}}
```
### Ownership
When calling a procedure that returns a `Future`, ownership of that `Future` is
shared between the callee that created it and the caller that waits for it to be
finished.
The `Future` can be thought of as a single-item channel between a producer and a
consumer. The producer creates the `Future` and is responsible for completing or
failing it while the caller waits for completion and may `cancel` it.
Although it is technically possible, callers must not `complete` or `fail`
futures and callees or other intermediate observers must not `cancel` them as
this may lead to panics and shutdown (ie if the future is completed twice or a
cancalletion is not handled by the original caller).
### `noCancel`
Certain operations must not be cancelled for semantic reasons. Common scenarios
include `closeWait` that releases a resources irrevocably and composed
operations whose individual steps should be performed together or not at all.
In such cases, the `noCancel` modifier to `await` can be used to temporarily
disable cancellation propagation, allowing the operation to complete even if
the caller initiates a cancellation request:
```nim
proc deepSleep(dur: Duration) {.async.} =
# `noCancel` prevents any cancellation request by the caller of `deepSleep`
# from reaching `sleepAsync` - even if `deepSleep` is cancelled, its future
# will not complete until the sleep finishes.
await noCancel sleepAsync(dur)
let future = deepSleep(10.minutes)
# This will take ~10 minutes even if we try to cancel the call to `deepSleep`!
await cancelAndWait(future)
```
### `join`
The `join` modifier to `await` allows cancelling an `async` procedure without
propagating the cancellation to the awaited operation. This is useful when
`await`:ing a `Future` for monitoring purposes, ie when a procedure is not the
owner of the future that's being `await`:ed.
One situation where this happens is when implementing the "observer" pattern,
where a helper monitors an operation it did not initiate:
```nim
var tick: Future[void]
proc ticker() {.async.} =
while true:
tick = sleepAsync(1.second)
await tick
echo "tick!"
proc tocker() {.async.} =
# This operation does not own or implement the operation behind `tick`,
# so it should not cancel it when `tocker` is cancelled
await join tick
echo "tock!"
let
fut = ticker() # `ticker` is now looping and most likely waiting for `tick`
fut2 = tocker() # both `ticker` and `tocker` are waiting for `tick`
# We don't want `tocker` to cancel a future that was created in `ticker`
waitFor fut2.cancelAndWait()
waitFor fut # keeps printing `tick!` every second.
```
## Compile-time configuration
`chronos` contains several compile-time [configuration options](./chronos/config.nim) enabling stricter compile-time checks and debugging helpers whose runtime cost may be significant.
`chronos` contains several compile-time
[configuration options](./chronos/config.nim) enabling stricter compile-time
checks and debugging helpers whose runtime cost may be significant.
Strictness options generally will become default in future chronos releases and allow adapting existing code without changing the new version - see the [`config.nim`](./chronos/config.nim) module for more information.
Strictness options generally will become default in future chronos releases and
allow adapting existing code without changing the new version - see the
[`config.nim`](./chronos/config.nim) module for more information.

View File

@ -110,7 +110,7 @@ sometimes lead to compile errors around forward declarations, methods and
closures as Nim conservatively asssumes that any `Exception` might be raised
from those.
Make sure to excplicitly annotate these with `{.raises.}`:
Make sure to explicitly annotate these with `{.raises.}`:
```nim
# Forward declarations need to explicitly include a raises list:
@ -124,11 +124,12 @@ proc myfunction() =
let closure: MyClosure = myfunction
```
## Compatibility modes
For compatibility, `async` functions can be instructed to handle `Exception` as
well, specifying `handleException: true`. `Exception` that is not a `Defect` and
not a `CatchableError` will then be caught and remapped to
`AsyncExceptionError`:
**Individual functions.** For compatibility, `async` functions can be instructed
to handle `Exception` as well, specifying `handleException: true`. Any
`Exception` that is not a `Defect` and not a `CatchableError` will then be
caught and remapped to `AsyncExceptionError`:
```nim
proc raiseException() {.async: (handleException: true, raises: [AsyncExceptionError]).} =
@ -136,14 +137,32 @@ proc raiseException() {.async: (handleException: true, raises: [AsyncExceptionEr
proc callRaiseException() {.async: (raises: []).} =
try:
raiseException()
await raiseException()
except AsyncExceptionError as exc:
# The original Exception is available from the `parent` field
echo exc.parent.msg
```
This mode can be enabled globally with `-d:chronosHandleException` as a help
when porting code to `chronos` but should generally be avoided as global
configuration settings may interfere with libraries that use `chronos` leading
to unexpected behavior.
**Global flag.** This mode can be enabled globally with
`-d:chronosHandleException` as a help when porting code to `chronos`. The
behavior in this case will be that:
1. old-style functions annotated with plain `async` will behave as if they had
been annotated with `async: (handleException: true)`.
This is functionally equivalent to
`async: (handleException: true, raises: [CatchableError])` and will, as
before, remap any `Exception` that is not `Defect` into
`AsyncExceptionError`, while also allowing any `CatchableError` (including
`AsyncExceptionError`) to get through without compilation errors.
2. New-style functions with `async: (raises: [...])` annotations or their own
`handleException` annotations will not be affected.
The rationale here is to allow one to incrementally introduce exception
annotations and get compiler feedback while not requiring that every bit of
legacy code is updated at once.
This should be used sparingly and with care, however, as global configuration
settings may interfere with libraries that use `chronos` leading to unexpected
behavior.

View File

@ -135,6 +135,16 @@ suite "Asynchronous issues test suite":
await server.closeWait()
return true
proc testOrDeadlock(): Future[bool] {.async.} =
proc f(): Future[void] {.async.} =
await sleepAsync(2.seconds) or sleepAsync(1.seconds)
let fx = f()
try:
await fx.cancelAndWait().wait(2.seconds)
except AsyncTimeoutError:
return false
true
test "Issue #6":
check waitFor(issue6()) == true
@ -152,3 +162,6 @@ suite "Asynchronous issues test suite":
test "IndexError crash test":
check waitFor(testIndexError()) == true
test "`or` deadlock [#516] test":
check waitFor(testOrDeadlock()) == true

View File

@ -32,6 +32,10 @@ suite "Datagram Transport test suite":
m8 = "Bounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
type
DatagramSocketType {.pure.} = enum
Bound, Unbound
proc client1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
@ -628,6 +632,243 @@ suite "Datagram Transport test suite":
await allFutures(sdgram.closeWait(), cdgram.closeWait())
res == 1
proc performAutoAddressTest(port: Port,
family: AddressFamily): Future[bool] {.async.} =
var
expectRequest1 = "AUTO REQUEST1"
expectRequest2 = "AUTO REQUEST2"
expectResponse = "AUTO RESPONSE"
mappedResponse = "MAPPED RESPONSE"
event = newAsyncEvent()
event2 = newAsyncEvent()
res = 0
proc process1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.
async: (raises: []).} =
try:
var
bmsg = transp.getMessage()
smsg = string.fromBytes(bmsg)
if smsg == expectRequest1:
inc(res)
await noCancel transp.sendTo(
raddr, addr expectResponse[0], len(expectResponse))
elif smsg == expectRequest2:
inc(res)
await noCancel transp.sendTo(
raddr, addr mappedResponse[0], len(mappedResponse))
except TransportError as exc:
raiseAssert exc.msg
except CancelledError as exc:
raiseAssert exc.msg
proc process2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.
async: (raises: []).} =
try:
var
bmsg = transp.getMessage()
smsg = string.fromBytes(bmsg)
if smsg == expectResponse:
inc(res)
event.fire()
except TransportError as exc:
raiseAssert exc.msg
except CancelledError as exc:
raiseAssert exc.msg
proc process3(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.
async: (raises: []).} =
try:
var
bmsg = transp.getMessage()
smsg = string.fromBytes(bmsg)
if smsg == mappedResponse:
inc(res)
event2.fire()
except TransportError as exc:
raiseAssert exc.msg
except CancelledError as exc:
raiseAssert exc.msg
let sdgram =
block:
var res: DatagramTransport
var currentPort = port
for i in 0 ..< 10:
res =
try:
newDatagramTransport(process1, currentPort,
flags = {ServerFlags.ReusePort})
except TransportOsError:
echo "Unable to create transport on port ", currentPort
currentPort = Port(uint16(currentPort) + 1'u16)
nil
if not(isNil(res)):
break
doAssert(not(isNil(res)), "Unable to create transport, giving up")
res
var
address =
case family
of AddressFamily.IPv4:
initTAddress("127.0.0.1:0")
of AddressFamily.IPv6:
initTAddress("::1:0")
of AddressFamily.Unix, AddressFamily.None:
raiseAssert "Not allowed"
let
cdgram =
case family
of AddressFamily.IPv4:
newDatagramTransport(process2, local = address)
of AddressFamily.IPv6:
newDatagramTransport6(process2, local = address)
of AddressFamily.Unix, AddressFamily.None:
raiseAssert "Not allowed"
address.port = sdgram.localAddress().port
try:
await noCancel cdgram.sendTo(
address, addr expectRequest1[0], len(expectRequest1))
except TransportError:
discard
if family == AddressFamily.IPv6:
var remote = initTAddress("127.0.0.1:0")
remote.port = sdgram.localAddress().port
let wtransp =
newDatagramTransport(process3, local = initTAddress("0.0.0.0:0"))
try:
await noCancel wtransp.sendTo(
remote, addr expectRequest2[0], len(expectRequest2))
except TransportError as exc:
raiseAssert "Got transport error, reason = " & $exc.msg
try:
await event2.wait().wait(1.seconds)
except CatchableError:
discard
await wtransp.closeWait()
try:
await event.wait().wait(1.seconds)
except CatchableError:
discard
await allFutures(sdgram.closeWait(), cdgram.closeWait())
if family == AddressFamily.IPv4:
res == 2
else:
res == 4
proc performAutoAddressTest2(
address1: Opt[IpAddress],
address2: Opt[IpAddress],
port: Port,
sendType: AddressFamily,
boundType: DatagramSocketType
): Future[bool] {.async.} =
let
expectRequest = "TEST REQUEST"
expectResponse = "TEST RESPONSE"
event = newAsyncEvent()
var res = 0
proc process1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.
async: (raises: []).} =
if raddr.family != sendType:
raiseAssert "Incorrect address family received [" & $raddr &
"], expected [" & $sendType & "]"
try:
let
bmsg = transp.getMessage()
smsg = string.fromBytes(bmsg)
if smsg == expectRequest:
inc(res)
await noCancel transp.sendTo(
raddr, unsafeAddr expectResponse[0], len(expectResponse))
except TransportError as exc:
raiseAssert exc.msg
except CancelledError as exc:
raiseAssert exc.msg
proc process2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.
async: (raises: []).} =
if raddr.family != sendType:
raiseAssert "Incorrect address family received [" & $raddr &
"], expected [" & $sendType & "]"
try:
let
bmsg = transp.getMessage()
smsg = string.fromBytes(bmsg)
if smsg == expectResponse:
inc(res)
event.fire()
except TransportError as exc:
raiseAssert exc.msg
except CancelledError as exc:
raiseAssert exc.msg
let
serverFlags = {ServerFlags.ReuseAddr}
server = newDatagramTransport(process1, flags = serverFlags,
local = address1, localPort = port)
serverAddr = server.localAddress()
serverPort = serverAddr.port
remoteAddress =
case sendType
of AddressFamily.IPv4:
var res = initTAddress("127.0.0.1:0")
res.port = serverPort
res
of AddressFamily.IPv6:
var res = initTAddress("[::1]:0")
res.port = serverPort
res
else:
raiseAssert "Incorrect sending type"
remoteIpAddress = Opt.some(remoteAddress.toIpAddress())
client =
case boundType
of DatagramSocketType.Bound:
newDatagramTransport(process2,
localPort = Port(0), remotePort = serverPort,
local = address2, remote = remoteIpAddress)
of DatagramSocketType.Unbound:
newDatagramTransport(process2,
localPort = Port(0), remotePort = Port(0),
local = address2)
try:
case boundType
of DatagramSocketType.Bound:
await noCancel client.send(
unsafeAddr expectRequest[0], len(expectRequest))
of DatagramSocketType.Unbound:
await noCancel client.sendTo(remoteAddress,
unsafeAddr expectRequest[0], len(expectRequest))
except TransportError as exc:
raiseAssert "Could not send datagram to remote peer, reason = " & $exc.msg
try:
await event.wait().wait(1.seconds)
except CatchableError:
discard
await allFutures(server.closeWait(), client.closeWait())
res == 2
test "close(transport) test":
check waitFor(testTransportClose()) == true
test m1:
@ -730,3 +971,104 @@ suite "Datagram Transport test suite":
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] Auto-address constructor test (*:0)":
if isAvailable(AddressFamily.IPv6):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv6)) == true
# If IPv6 is available newAutoDatagramTransport should bind to `::` - this
# means that we should be able to connect to it via IPV4_MAPPED address,
# but only when IPv4 is also available.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv4)) == true
else:
# If IPv6 is not available newAutoDatagramTransport should bind to
# `0.0.0.0` - this means we should be able to connect to it via IPv4
# address.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv4)) == true
asyncTest "[IP] Auto-address constructor test (*:30231)":
if isAvailable(AddressFamily.IPv6):
check:
(await performAutoAddressTest(Port(30231), AddressFamily.IPv6)) == true
# If IPv6 is available newAutoDatagramTransport should bind to `::` - this
# means that we should be able to connect to it via IPV4_MAPPED address,
# but only when IPv4 is also available.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(30231), AddressFamily.IPv4)) ==
true
else:
# If IPv6 is not available newAutoDatagramTransport should bind to
# `0.0.0.0` - this means we should be able to connect to it via IPv4
# address.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(30231), AddressFamily.IPv4)) ==
true
for socketType in DatagramSocketType:
for portNumber in [Port(0), Port(30231)]:
asyncTest "[IP] IPv6 mapping test (" & $socketType &
"/auto-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv4, socketType))
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv6, socketType))
else:
skip()
asyncTest "[IP] IPv6 mapping test (" & $socketType &
"/auto-ipv6:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.some(initTAddress("[::1]:0").toIpAddress())
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv6, socketType))
else:
skip()
asyncTest "[IP] IPv6 mapping test (" & $socketType &
"/auto-ipv4:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.some(initTAddress("127.0.0.1:0").toIpAddress())
check:
(await performAutoAddressTest2(address1, address2, portNumber,
AddressFamily.IPv4, socketType))
else:
skip()
asyncTest "[IP] IPv6 mapping test (" & $socketType &
"/ipv6-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.some(initTAddress("[::1]:0").toIpAddress())
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(address1, address2, portNumber,
AddressFamily.IPv6, socketType))
else:
skip()
asyncTest "[IP] IPv6 mapping test (" & $socketType &
"/ipv4-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.some(initTAddress("127.0.0.1:0").toIpAddress())
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(address1, address2, portNumber,
AddressFamily.IPv4, socketType))
else:
skip()

View File

@ -6,7 +6,7 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest2
import stew/results
import results
import ../chronos, ../chronos/unittest2/asynctests
{.used.}
@ -83,7 +83,7 @@ suite "Future[T] behavior test suite":
fut.finished
testResult == "1245"
asyncTest "wait[T]() test":
asyncTest "wait(duration) test":
block:
## Test for not immediately completed future and timeout = -1
let res =
@ -146,6 +146,183 @@ suite "Future[T] behavior test suite":
false
check res
asyncTest "wait(future) test":
block:
## Test for not immediately completed future and deadline which is not
## going to be finished
let
deadline = newFuture[void]()
future1 = testFuture1()
let res =
try:
discard await wait(future1, deadline)
true
except CatchableError:
false
check:
deadline.finished() == false
future1.finished() == true
res == true
await deadline.cancelAndWait()
check deadline.finished() == true
block:
## Test for immediately completed future and timeout = -1
let
deadline = newFuture[void]()
future2 = testFuture2()
let res =
try:
discard await wait(future2, deadline)
true
except CatchableError:
false
check:
deadline.finished() == false
future2.finished() == true
res
await deadline.cancelAndWait()
check deadline.finished() == true
block:
## Test for not immediately completed future and timeout = 0
let
deadline = newFuture[void]()
future1 = testFuture1()
deadline.complete()
let res =
try:
discard await wait(future1, deadline)
false
except AsyncTimeoutError:
true
except CatchableError:
false
check:
future1.finished() == false
deadline.finished() == true
res
block:
## Test for immediately completed future and timeout = 0
let
deadline = newFuture[void]()
future2 = testFuture2()
deadline.complete()
let (res1, res2) =
try:
let res = await wait(future2, deadline)
(true, res)
except CatchableError:
(false, -1)
check:
future2.finished() == true
deadline.finished() == true
res1 == true
res2 == 1
block:
## Test for future which cannot be completed in timeout period
let
deadline = sleepAsync(50.milliseconds)
future100 = testFuture100()
let res =
try:
discard await wait(future100, deadline)
false
except AsyncTimeoutError:
true
except CatchableError:
false
check:
deadline.finished() == true
res
await future100.cancelAndWait()
check:
future100.finished() == true
block:
## Test for future which will be completed before timeout exceeded.
let
deadline = sleepAsync(500.milliseconds)
future100 = testFuture100()
let (res1, res2) =
try:
let res = await wait(future100, deadline)
(true, res)
except CatchableError:
(false, -1)
check:
future100.finished() == true
deadline.finished() == false
res1 == true
res2 == 0
await deadline.cancelAndWait()
check:
deadline.finished() == true
asyncTest "wait(future) cancellation behavior test":
proc deepTest3(future: Future[void]) {.async.} =
await future
proc deepTest2(future: Future[void]) {.async.} =
await deepTest3(future)
proc deepTest1(future: Future[void]) {.async.} =
await deepTest2(future)
let
deadlineFuture = newFuture[void]()
block:
# Cancellation should affect `testFuture` because it is in pending state.
let monitorFuture = newFuture[void]()
var testFuture = deepTest1(monitorFuture)
let waitFut = wait(testFuture, deadlineFuture)
await cancelAndWait(waitFut)
check:
monitorFuture.cancelled() == true
testFuture.cancelled() == true
waitFut.cancelled() == true
deadlineFuture.finished() == false
block:
# Cancellation should not affect `testFuture` because it is completed.
let monitorFuture = newFuture[void]()
var testFuture = deepTest1(monitorFuture)
let waitFut = wait(testFuture, deadlineFuture)
monitorFuture.complete()
await cancelAndWait(waitFut)
check:
monitorFuture.completed() == true
monitorFuture.cancelled() == false
testFuture.completed() == true
waitFut.completed() == true
deadlineFuture.finished() == false
block:
# Cancellation should not affect `testFuture` because it is failed.
let monitorFuture = newFuture[void]()
var testFuture = deepTest1(monitorFuture)
let waitFut = wait(testFuture, deadlineFuture)
monitorFuture.fail(newException(ValueError, "TEST"))
await cancelAndWait(waitFut)
check:
monitorFuture.failed() == true
monitorFuture.cancelled() == false
testFuture.failed() == true
testFuture.cancelled() == false
waitFut.failed() == true
testFuture.cancelled() == false
deadlineFuture.finished() == false
await cancelAndWait(deadlineFuture)
check deadlineFuture.finished() == true
asyncTest "Discarded result Future[T] test":
var completedFutures = 0
@ -704,6 +881,14 @@ suite "Future[T] behavior test suite":
not(fut2.failed())
fut2.read() == f21
asyncTest "one() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
one(f).cancelSoon()
await checkraises()
asyncTest "or() test":
proc client1() {.async.} =
await sleepAsync(200.milliseconds)
@ -1074,7 +1259,7 @@ suite "Future[T] behavior test suite":
completed == 0
cancelled == 1
asyncTest "Cancellation wait() test":
asyncTest "Cancellation wait(duration) test":
var neverFlag1, neverFlag2, neverFlag3: bool
var waitProc1, waitProc2: bool
proc neverEndingProc(): Future[void] =
@ -1135,7 +1320,39 @@ suite "Future[T] behavior test suite":
fut.state == FutureState.Completed
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2
asyncTest "Cancellation race test":
asyncTest "Cancellation wait(future) test":
var neverFlag1, neverFlag2, neverFlag3: bool
var waitProc1, waitProc2: bool
proc neverEndingProc(): Future[void] =
var res = newFuture[void]()
proc continuation(udata: pointer) {.gcsafe.} =
neverFlag2 = true
proc cancellation(udata: pointer) {.gcsafe.} =
neverFlag3 = true
res.addCallback(continuation)
res.cancelCallback = cancellation
result = res
neverFlag1 = true
proc waitProc() {.async.} =
let deadline = sleepAsync(100.milliseconds)
try:
await wait(neverEndingProc(), deadline)
except CancelledError:
waitProc1 = true
except CatchableError:
doAssert(false)
finally:
await cancelAndWait(deadline)
waitProc2 = true
var fut = waitProc()
await cancelAndWait(fut)
check:
fut.state == FutureState.Completed
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2
asyncTest "Cancellation race() test":
var someFut = newFuture[void]()
proc raceProc(): Future[void] {.async.} =
@ -1220,7 +1437,10 @@ suite "Future[T] behavior test suite":
test "location test":
# WARNING: This test is very sensitive to line numbers and module name.
template start(): int =
instantiationInfo().line
const first = start()
proc macroFuture() {.async.} =
let someVar {.used.} = 5 # LINE POSITION 1
let someOtherVar {.used.} = 4
@ -1258,12 +1478,12 @@ suite "Future[T] behavior test suite":
(loc.procedure == procedure)
check:
chk(loc10, "testfut.nim", 1225, "macroFuture")
chk(loc11, "testfut.nim", 1228, "")
chk(loc20, "testfut.nim", 1237, "template")
chk(loc21, "testfut.nim", 1240, "")
chk(loc30, "testfut.nim", 1234, "procedure")
chk(loc31, "testfut.nim", 1241, "")
chk(loc10, "testfut.nim", first + 2, "macroFuture")
chk(loc11, "testfut.nim", first + 5, "")
chk(loc20, "testfut.nim", first + 14, "template")
chk(loc21, "testfut.nim", first + 17, "")
chk(loc30, "testfut.nim", first + 11, "procedure")
chk(loc31, "testfut.nim", first + 18, "")
asyncTest "withTimeout(fut) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
@ -1287,7 +1507,7 @@ suite "Future[T] behavior test suite":
false
check res
asyncTest "wait(fut) should wait cancellation test":
asyncTest "wait(future) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
newFuture[void]("neverending.future")
@ -1311,6 +1531,29 @@ suite "Future[T] behavior test suite":
check res
asyncTest "wait(future) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
newFuture[void]("neverending.future")
proc futureOneLevelMore() {.async.} =
await futureNeverEnds()
var fut = futureOneLevelMore()
let res =
try:
await wait(fut, sleepAsync(100.milliseconds))
false
except AsyncTimeoutError:
# Because `fut` is never-ending Future[T], `wait` should raise
# `AsyncTimeoutError`, but only after `fut` is cancelled.
if fut.cancelled():
true
else:
false
except CatchableError:
false
check res
test "race(zero) test":
var tseq = newSeq[FutureBase]()
var fut1 = race(tseq)
@ -1507,6 +1750,14 @@ suite "Future[T] behavior test suite":
f2.finished()
f3.finished()
asyncTest "race() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
race(f).cancelSoon()
await checkraises()
test "Unsigned integer overflow test":
check:
0xFFFF_FFFF_FFFF_FFFF'u64 + 1'u64 == 0'u64
@ -1544,7 +1795,7 @@ suite "Future[T] behavior test suite":
v1_u == 0'u
v2_u + 1'u == 0'u
asyncTest "wait() cancellation undefined behavior test #1":
asyncTest "wait(duration) cancellation undefined behavior test #1":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
@ -1567,7 +1818,7 @@ suite "Future[T] behavior test suite":
discard someFut.tryCancel()
await someFut
asyncTest "wait() cancellation undefined behavior test #2":
asyncTest "wait(duration) cancellation undefined behavior test #2":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
@ -1594,6 +1845,19 @@ suite "Future[T] behavior test suite":
discard someFut.tryCancel()
await someFut
asyncTest "wait(duration) should allow cancellation test (depends on race())":
proc testFoo(): Future[bool] {.async.} =
let
resFut = sleepAsync(2.seconds).wait(3.seconds)
timeFut = sleepAsync(1.seconds)
cancelFut = cancelAndWait(resFut)
discard await race(cancelFut, timeFut)
if cancelFut.finished():
return (resFut.cancelled() and cancelFut.completed())
false
check (await testFoo()) == true
asyncTest "withTimeout() cancellation undefined behavior test #1":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
@ -1654,6 +1918,91 @@ suite "Future[T] behavior test suite":
discard someFut.tryCancel()
await someFut
asyncTest "withTimeout() should allow cancellation test (depends on race())":
proc testFoo(): Future[bool] {.async.} =
let
resFut = sleepAsync(2.seconds).withTimeout(3.seconds)
timeFut = sleepAsync(1.seconds)
cancelFut = cancelAndWait(resFut)
discard await race(cancelFut, timeFut)
if cancelFut.finished():
return (resFut.cancelled() and cancelFut.completed())
false
check (await testFoo()) == true
asyncTest "wait(future) cancellation undefined behavior test #1":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
return TestFooConnection()
proc testFoo(fooFut: Future[void]) {.async.} =
let deadline = sleepAsync(10.seconds)
let connection =
try:
let res = await testInnerFoo(fooFut).wait(deadline)
Result[TestFooConnection, int].ok(res)
except CancelledError:
Result[TestFooConnection, int].err(0)
except CatchableError:
Result[TestFooConnection, int].err(1)
finally:
await deadline.cancelAndWait()
check connection.isOk()
var future = newFuture[void]("last.child.future")
var someFut = testFoo(future)
future.complete()
discard someFut.tryCancel()
await someFut
asyncTest "wait(future) cancellation undefined behavior test #2":
proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await fooFut
return TestFooConnection()
proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {.
async.} =
await testInnerFoo(fooFut)
proc testFoo(fooFut: Future[void]) {.async.} =
let deadline = sleepAsync(10.seconds)
let connection =
try:
let res = await testMiddleFoo(fooFut).wait(deadline)
Result[TestFooConnection, int].ok(res)
except CancelledError:
Result[TestFooConnection, int].err(0)
except CatchableError:
Result[TestFooConnection, int].err(1)
finally:
await deadline.cancelAndWait()
check connection.isOk()
var future = newFuture[void]("last.child.future")
var someFut = testFoo(future)
future.complete()
discard someFut.tryCancel()
await someFut
asyncTest "wait(future) should allow cancellation test (depends on race())":
proc testFoo(): Future[bool] {.async.} =
let
deadline = sleepAsync(3.seconds)
resFut = sleepAsync(2.seconds).wait(deadline)
timeFut = sleepAsync(1.seconds)
cancelFut = cancelAndWait(resFut)
discard await race(cancelFut, timeFut)
await deadline.cancelAndWait()
if cancelFut.finished():
return (resFut.cancelled() and cancelFut.completed())
false
check (await testFoo()) == true
asyncTest "Cancellation behavior test":
proc testInnerFoo(fooFut: Future[void]) {.async.} =
await fooFut
@ -2002,9 +2351,348 @@ suite "Future[T] behavior test suite":
check:
future1.cancelled() == true
future2.cancelled() == true
asyncTest "join() test":
proc joinFoo0(future: FutureBase) {.async.} =
await join(future)
proc joinFoo1(future: Future[void]) {.async.} =
await join(future)
proc joinFoo2(future: Future[void]) {.
async: (raises: [CancelledError]).} =
await join(future)
let
future0 = newFuture[void]()
future1 = newFuture[void]()
future2 = Future[void].Raising([CancelledError]).init()
let
resfut0 = joinFoo0(future0)
resfut1 = joinFoo1(future1)
resfut2 = joinFoo2(future2)
check:
resfut0.finished() == false
resfut1.finished() == false
resfut2.finished() == false
future0.complete()
future1.complete()
future2.complete()
let res =
try:
await noCancel allFutures(resfut0, resfut1, resfut2).wait(1.seconds)
true
except AsyncTimeoutError:
false
check:
res == true
resfut0.finished() == true
resfut1.finished() == true
resfut2.finished() == true
future0.finished() == true
future1.finished() == true
future2.finished() == true
asyncTest "join() cancellation test":
proc joinFoo0(future: FutureBase) {.async.} =
await join(future)
proc joinFoo1(future: Future[void]) {.async.} =
await join(future)
proc joinFoo2(future: Future[void]) {.
async: (raises: [CancelledError]).} =
await join(future)
let
future0 = newFuture[void]()
future1 = newFuture[void]()
future2 = Future[void].Raising([CancelledError]).init()
let
resfut0 = joinFoo0(future0)
resfut1 = joinFoo1(future1)
resfut2 = joinFoo2(future2)
check:
resfut0.finished() == false
resfut1.finished() == false
resfut2.finished() == false
let
cancelfut0 = cancelAndWait(resfut0)
cancelfut1 = cancelAndWait(resfut1)
cancelfut2 = cancelAndWait(resfut2)
let res =
try:
await noCancel allFutures(cancelfut0, cancelfut1,
cancelfut2).wait(1.seconds)
true
except AsyncTimeoutError:
false
check:
res == true
cancelfut0.finished() == true
cancelfut1.finished() == true
cancelfut2.finished() == true
resfut0.cancelled() == true
resfut1.cancelled() == true
resfut2.cancelled() == true
future0.finished() == false
future1.finished() == false
future2.finished() == false
future0.complete()
future1.complete()
future2.complete()
check:
future0.finished() == true
future1.finished() == true
future2.finished() == true
test "Sink with literals":
# https://github.com/nim-lang/Nim/issues/22175
let fut = newFuture[string]()
fut.complete("test")
check:
fut.value() == "test"
test "Raising type matching":
type X[E] = Future[void].Raising(E)
proc f(x: X) = discard
var v: Future[void].Raising([ValueError])
f(v)
type Object = object
# TODO cannot use X[[ValueError]] here..
field: Future[void].Raising([ValueError])
discard Object(field: v)
check:
not compiles(Future[void].Raising([42]))
not compiles(Future[void].Raising(42))
asyncTest "Timeout/cancellation race wait(duration) test":
proc raceTest(T: typedesc, itype: int) {.async.} =
let monitorFuture = newFuture[T]("monitor",
{FutureFlag.OwnCancelSchedule})
proc raceProc0(future: Future[T]): Future[T] {.async.} =
await future
proc raceProc1(future: Future[T]): Future[T] {.async.} =
await raceProc0(future)
proc raceProc2(future: Future[T]): Future[T] {.async.} =
await raceProc1(future)
proc activation(udata: pointer) {.gcsafe.} =
if itype == 0:
when T is void:
monitorFuture.complete()
elif T is int:
monitorFuture.complete(100)
elif itype == 1:
monitorFuture.fail(newException(ValueError, "test"))
else:
monitorFuture.cancelAndSchedule()
monitorFuture.cancelCallback = activation
let
testFut = raceProc2(monitorFuture)
waitFut = wait(testFut, 10.milliseconds)
when T is void:
let waitRes =
try:
await waitFut
if itype == 0:
true
else:
false
except CancelledError:
false
except CatchableError:
if itype != 0:
true
else:
false
check waitRes == true
elif T is int:
let waitRes =
try:
let res = await waitFut
if itype == 0:
(true, res)
else:
(false, -1)
except CancelledError:
(false, -1)
except CatchableError:
if itype != 0:
(true, 0)
else:
(false, -1)
if itype == 0:
check:
waitRes[0] == true
waitRes[1] == 100
else:
check:
waitRes[0] == true
await raceTest(void, 0)
await raceTest(void, 1)
await raceTest(void, 2)
await raceTest(int, 0)
await raceTest(int, 1)
await raceTest(int, 2)
asyncTest "Timeout/cancellation race wait(future) test":
proc raceTest(T: typedesc, itype: int) {.async.} =
let monitorFuture = newFuture[T]()
proc raceProc0(future: Future[T]): Future[T] {.async.} =
await future
proc raceProc1(future: Future[T]): Future[T] {.async.} =
await raceProc0(future)
proc raceProc2(future: Future[T]): Future[T] {.async.} =
await raceProc1(future)
proc continuation(udata: pointer) {.gcsafe.} =
if itype == 0:
when T is void:
monitorFuture.complete()
elif T is int:
monitorFuture.complete(100)
elif itype == 1:
monitorFuture.fail(newException(ValueError, "test"))
else:
monitorFuture.cancelAndSchedule()
let deadlineFuture = newFuture[void]()
deadlineFuture.addCallback continuation
let
testFut = raceProc2(monitorFuture)
waitFut = wait(testFut, deadlineFuture)
deadlineFuture.complete()
when T is void:
let waitRes =
try:
await waitFut
if itype == 0:
true
else:
false
except CancelledError:
false
except CatchableError:
if itype != 0:
true
else:
false
check waitRes == true
elif T is int:
let waitRes =
try:
let res = await waitFut
if itype == 0:
(true, res)
else:
(false, -1)
except CancelledError:
(false, -1)
except CatchableError:
if itype != 0:
(true, 0)
else:
(false, -1)
if itype == 0:
check:
waitRes[0] == true
waitRes[1] == 100
else:
check:
waitRes[0] == true
await raceTest(void, 0)
await raceTest(void, 1)
await raceTest(void, 2)
await raceTest(int, 0)
await raceTest(int, 1)
await raceTest(int, 2)
asyncTest "Timeout/cancellation race withTimeout() test":
proc raceTest(T: typedesc, itype: int) {.async.} =
let monitorFuture = newFuture[T]("monitor",
{FutureFlag.OwnCancelSchedule})
proc raceProc0(future: Future[T]): Future[T] {.async.} =
await future
proc raceProc1(future: Future[T]): Future[T] {.async.} =
await raceProc0(future)
proc raceProc2(future: Future[T]): Future[T] {.async.} =
await raceProc1(future)
proc activation(udata: pointer) {.gcsafe.} =
if itype == 0:
when T is void:
monitorFuture.complete()
elif T is int:
monitorFuture.complete(100)
elif itype == 1:
monitorFuture.fail(newException(ValueError, "test"))
else:
monitorFuture.cancelAndSchedule()
monitorFuture.cancelCallback = activation
let
testFut = raceProc2(monitorFuture)
waitFut = withTimeout(testFut, 10.milliseconds)
when T is void:
let waitRes =
try:
await waitFut
except CancelledError:
false
except CatchableError:
false
if itype == 0:
check waitRes == true
elif itype == 1:
check waitRes == true
else:
check waitRes == false
elif T is int:
let waitRes =
try:
await waitFut
except CancelledError:
false
except CatchableError:
false
if itype == 0:
check waitRes == true
elif itype == 1:
check waitRes == true
else:
check waitRes == false
await raceTest(void, 0)
await raceTest(void, 1)
await raceTest(void, 2)
await raceTest(int, 0)
await raceTest(int, 1)
await raceTest(int, 2)

View File

@ -1518,3 +1518,63 @@ suite "HTTP client testing suite":
res.isErr() and res.error == HttpAddressErrorType.NameLookupFailed
res.error.isRecoverableError()
not(res.error.isCriticalError())
asyncTest "HTTPS response headers buffer size test":
const HeadersSize = HttpMaxHeadersSize
let expectValue =
string.fromBytes(createBigMessage("HEADERSTEST", HeadersSize))
proc process(r: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).} =
if r.isOk():
let request = r.get()
try:
case request.uri.path
of "/test":
let headers = HttpTable.init([("big-header", expectValue)])
await request.respond(Http200, "ok", headers)
else:
await request.respond(Http404, "Page not found")
except HttpWriteError as exc:
defaultResponse(exc)
else:
defaultResponse()
var server = createServer(initTAddress("127.0.0.1:0"), process, false)
server.start()
let
address = server.instance.localAddress()
ha = getAddress(address, HttpClientScheme.NonSecure, "/test")
session = HttpSessionRef.new()
let
req1 = HttpClientRequestRef.new(session, ha)
req2 =
HttpClientRequestRef.new(session, ha,
maxResponseHeadersSize = HttpMaxHeadersSize * 2)
res1 =
try:
let res {.used.} = await send(req1)
await closeWait(req1)
await closeWait(res)
false
except HttpReadError:
true
except HttpError:
await closeWait(req1)
false
except CancelledError:
await closeWait(req1)
false
res2 = await send(req2)
check:
res1 == true
res2.status == 200
res2.headers.getString("big-header") == expectValue
await req1.closeWait()
await req2.closeWait()
await res2.closeWait()
await session.closeWait()
await server.stop()
await server.closeWait()

View File

@ -13,6 +13,11 @@ import stew/base10
{.used.}
# Trouble finding this if defined near its use for `data2.sorted`, etc. likely
# related to "generic sandwich" issues. If any test ever wants to `sort` a
# `seq[(string, seq[string]]` differently, they may need to re-work that test.
proc `<`(a, b: (string, seq[string])): bool = a[0] < b[0]
suite "HTTP server testing suite":
teardown:
checkLeaks()
@ -846,11 +851,11 @@ suite "HTTP server testing suite":
for key, value in table1.items(true):
data2.add((key, value))
check:
data1 == @[("Header2", "value2"), ("Header2", "VALUE3"),
("Header1", "value1")]
data2 == @[("Header2", @["value2", "VALUE3"]),
("Header1", @["value1"])]
check: # .sorted to not depend upon hash(key)-order
data1.sorted == sorted(@[("Header2", "value2"), ("Header2", "VALUE3"),
("Header1", "value1")])
data2.sorted == sorted(@[("Header2", @["value2", "VALUE3"]),
("Header1", @["value1"])])
table1.set("header2", "value4")
check:

View File

@ -8,6 +8,7 @@
import std/[macros, strutils]
import unittest2
import ../chronos
import ../chronos/config
{.used.}
@ -491,7 +492,7 @@ suite "Exceptions tracking":
proc testit2 {.async: (raises: [IOError]).} =
raise (ref IOError)()
proc test {.async: (raises: [ValueError, IOError]).} =
proc test {.async: (raises: [CancelledError, ValueError, IOError]).} =
await testit() or testit2()
proc noraises() {.raises: [].} =
@ -519,7 +520,7 @@ suite "Exceptions tracking":
noraises()
test "Nocancel errors":
test "Nocancel errors with raises":
proc testit {.async: (raises: [ValueError, CancelledError]).} =
await sleepAsync(5.milliseconds)
raise (ref ValueError)()
@ -535,6 +536,36 @@ suite "Exceptions tracking":
noraises()
test "Nocancel with no errors":
proc testit {.async: (raises: [CancelledError]).} =
await sleepAsync(5.milliseconds)
proc test {.async: (raises: []).} =
await noCancel testit()
proc noraises() {.raises: [].} =
let f = test()
waitFor(f.cancelAndWait())
waitFor(f)
noraises()
test "Nocancel errors without raises":
proc testit {.async.} =
await sleepAsync(5.milliseconds)
raise (ref ValueError)()
proc test {.async.} =
await noCancel testit()
proc noraises() =
expect(ValueError):
let f = test()
waitFor(f.cancelAndWait())
waitFor(f)
noraises()
test "Defect on wrong exception type at runtime":
{.push warning[User]: off}
let f = InternalRaisesFuture[void, (ValueError,)]()
@ -556,6 +587,20 @@ suite "Exceptions tracking":
waitFor(callCatchAll())
test "Global handleException does not override local annotations":
when chronosHandleException:
proc unnanotated() {.async.} = raise (ref CatchableError)()
checkNotCompiles:
proc annotated() {.async: (raises: [ValueError]).} =
raise (ref CatchableError)()
checkNotCompiles:
proc noHandleException() {.async: (handleException: false).} =
raise (ref Exception)()
else:
skip()
test "Results compatibility":
proc returnOk(): Future[Result[int, string]] {.async: (raises: []).} =
ok(42)

View File

@ -1486,6 +1486,170 @@ suite "Stream Transport test suite":
await server.closeWait()
testResult
proc performAutoAddressTest(port: Port,
family: AddressFamily): Future[bool] {.
async: (raises: []).} =
let server =
block:
var currentPort = port
var res: StreamServer
for i in 0 ..< 10:
res =
try:
createStreamServer(port, flags = {ServerFlags.ReuseAddr})
except TransportOsError as exc:
echo "Unable to create server on port ", currentPort,
" with error: ", exc.msg
currentPort = Port(uint16(currentPort) + 1'u16)
nil
if not(isNil(res)):
break
doAssert(not(isNil(res)), "Unable to create server, giving up")
res
var
address =
case family
of AddressFamily.IPv4:
try:
initTAddress("127.0.0.1:0")
except TransportAddressError as exc:
raiseAssert exc.msg
of AddressFamily.IPv6:
try:
initTAddress("::1:0")
except TransportAddressError as exc:
raiseAssert exc.msg
of AddressFamily.Unix, AddressFamily.None:
raiseAssert "Not allowed"
address.port = server.localAddress().port
var acceptFut = server.accept()
let
clientTransp =
try:
let res = await connect(address).wait(2.seconds)
Opt.some(res)
except CatchableError:
Opt.none(StreamTransport)
serverTransp =
if clientTransp.isSome():
let res =
try:
await noCancel acceptFut
except TransportError as exc:
raiseAssert exc.msg
Opt.some(res)
else:
Opt.none(StreamTransport)
let testResult = clientTransp.isSome() and serverTransp.isSome()
var pending: seq[FutureBase]
if clientTransp.isSome():
pending.add(closeWait(clientTransp.get()))
if serverTransp.isSome():
pending.add(closeWait(serverTransp.get()))
else:
pending.add(cancelAndWait(acceptFut))
await noCancel allFutures(pending)
try:
server.stop()
except TransportError as exc:
raiseAssert exc.msg
await server.closeWait()
testResult
proc performAutoAddressTest2(
address1: Opt[IpAddress],
address2: Opt[IpAddress],
port: Port,
sendType: AddressFamily
): Future[bool] {.async: (raises: []).} =
let
server =
block:
var
currentPort = port
res: StreamServer
for i in 0 ..< 10:
res =
try:
createStreamServer(port, host = address1,
flags = {ServerFlags.ReuseAddr})
except TransportOsError as exc:
echo "Unable to create server on port ", currentPort,
" with error: ", exc.msg
currentPort = Port(uint16(currentPort) + 1'u16)
nil
if not(isNil(res)):
break
doAssert(not(isNil(res)), "Unable to create server, giving up")
res
serverAddr = server.localAddress()
serverPort = serverAddr.port
remoteAddress =
try:
case sendType
of AddressFamily.IPv4:
var res = initTAddress("127.0.0.1:0")
res.port = serverPort
res
of AddressFamily.IPv6:
var res = initTAddress("[::1]:0")
res.port = serverPort
res
else:
raiseAssert "Incorrect sending type"
except TransportAddressError as exc:
raiseAssert "Unable to initialize transport address, " &
"reason = " & exc.msg
acceptFut = server.accept()
let
clientTransp =
try:
if address2.isSome():
let
laddr = initTAddress(address2.get(), Port(0))
res = await connect(remoteAddress, localAddress = laddr).
wait(2.seconds)
Opt.some(res)
else:
let res = await connect(remoteAddress).wait(2.seconds)
Opt.some(res)
except CatchableError:
Opt.none(StreamTransport)
serverTransp =
if clientTransp.isSome():
let res =
try:
await noCancel acceptFut
except TransportError as exc:
raiseAssert exc.msg
Opt.some(res)
else:
Opt.none(StreamTransport)
testResult =
clientTransp.isSome() and serverTransp.isSome() and
(serverTransp.get().remoteAddress2().get().family == sendType) and
(clientTransp.get().remoteAddress2().get().family == sendType)
var pending: seq[FutureBase]
if clientTransp.isSome():
pending.add(closeWait(clientTransp.get()))
if serverTransp.isSome():
pending.add(closeWait(serverTransp.get()))
else:
pending.add(cancelAndWait(acceptFut))
await noCancel allFutures(pending)
try:
server.stop()
except TransportError as exc:
raiseAssert exc.msg
await server.closeWait()
testResult
markFD = getCurrentFD()
for i in 0..<len(addresses):
@ -1668,6 +1832,96 @@ suite "Stream Transport test suite":
DualStackType.Disabled, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] Auto-address constructor test (*:0)":
if isAvailable(AddressFamily.IPv6):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv6)) == true
# If IPv6 is available createStreamServer should bind to `::` this means
# that we should be able to connect to it via IPV4_MAPPED address, but
# only when IPv4 is also available.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv4)) == true
else:
# If IPv6 is not available createStreamServer should bind to `0.0.0.0`
# this means we should be able to connect to it via IPV4 address.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(0), AddressFamily.IPv4)) == true
asyncTest "[IP] Auto-address constructor test (*:30532)":
if isAvailable(AddressFamily.IPv6):
check:
(await performAutoAddressTest(Port(30532), AddressFamily.IPv6)) == true
# If IPv6 is available createStreamServer should bind to `::` this means
# that we should be able to connect to it via IPV4_MAPPED address, but
# only when IPv4 is also available.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(30532), AddressFamily.IPv4)) ==
true
else:
# If IPv6 is not available createStreamServer should bind to `0.0.0.0`
# this means we should be able to connect to it via IPV4 address.
if isAvailable(AddressFamily.IPv4):
check:
(await performAutoAddressTest(Port(30532), AddressFamily.IPv4)) ==
true
for portNumber in [Port(0), Port(30231)]:
asyncTest "[IP] IPv6 mapping test (auto-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv4))
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv6))
else:
skip()
asyncTest "[IP] IPv6 mapping test (auto-ipv6:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.some(initTAddress("[::1]:0").toIpAddress())
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv6))
else:
skip()
asyncTest "[IP] IPv6 mapping test (auto-ipv4:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.none(IpAddress)
address2 = Opt.some(initTAddress("127.0.0.1:0").toIpAddress())
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv4))
else:
skip()
asyncTest "[IP] IPv6 mapping test (ipv6-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.some(initTAddress("[::1]:0").toIpAddress())
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv6))
else:
skip()
asyncTest "[IP] IPv6 mapping test (ipv4-auto:" & $int(portNumber) & ")":
if isAvailable(AddressFamily.IPv6):
let
address1 = Opt.some(initTAddress("127.0.0.1:0").toIpAddress())
address2 = Opt.none(IpAddress)
check:
(await performAutoAddressTest2(
address1, address2, portNumber, AddressFamily.IPv4))
else:
skip()
test "File descriptors leak test":
when defined(windows):
# Windows handle numbers depends on many conditions, so we can't use

View File

@ -89,6 +89,9 @@ suite "Asynchronous timers & steps test suite":
$nanoseconds(1_000_000_900) == "1s900ns"
$nanoseconds(1_800_700_000) == "1s800ms700us"
$nanoseconds(1_800_000_600) == "1s800ms600ns"
nanoseconds(1_800_000_600).toString(0) == ""
nanoseconds(1_800_000_600).toString(1) == "1s"
nanoseconds(1_800_000_600).toString(2) == "1s800ms"
test "Asynchronous steps test":
var fut1 = stepsAsync(1)