Fix unused warnings, result, asyncCheck and 80 cpl (#185)

* Fix sources to follow 80 characters per line.
Fix unused compilation warnings.
Refactor (remove result) handles.nim.
Fix tests to use asyncSpawn instead of asyncCheck.

* Fix handles for Unix platforms.
This commit is contained in:
Eugene Kabanov 2021-05-07 23:52:24 +03:00 committed by GitHub
parent fed6b0ac92
commit d270dba8a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 93 additions and 75 deletions

View File

@ -8,7 +8,7 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import std/[os, tables, strutils, heapqueue, options, deques, cstrutils, sequtils]
import std/[os, tables, strutils, heapqueue, options, deques, sequtils]
import ./srcloc
export srcloc
@ -467,7 +467,8 @@ proc `$`(stackTraceEntries: seq[StackTraceEntry]): string =
if hint.len > 0:
result.add(spaces(indent+2) & "## " & hint & "\n")
except ValueError as exc:
return exc.msg # Shouldn't actually happen since we set the formatting string
return exc.msg # Shouldn't actually happen since we set the formatting
# string
when defined(chronosStackTrace):
proc injectStacktrace(future: FutureBase) =
@ -493,7 +494,8 @@ when defined(chronosStackTrace):
# newMsg.add "\n" & $entry
future.error.msg = newMsg
proc internalCheckComplete*(fut: FutureBase) {.raises: [Defect, CatchableError].} =
proc internalCheckComplete*(fut: FutureBase) {.
raises: [Defect, CatchableError].} =
# For internal use only. Used in asyncmacro
if not(isNil(fut.error)):
when defined(chronosStackTrace):
@ -505,7 +507,8 @@ proc internalRead*[T](fut: Future[T] | FutureVar[T]): T {.inline.} =
when T isnot void:
return fut.value
proc read*[T](future: Future[T] | FutureVar[T]): T {.raises: [Defect, CatchableError].} =
proc read*[T](future: Future[T] | FutureVar[T]): T {.
raises: [Defect, CatchableError].} =
## Retrieves the value of ``future``. Future must be finished otherwise
## this function will fail with a ``ValueError`` exception.
##
@ -517,7 +520,8 @@ proc read*[T](future: Future[T] | FutureVar[T]): T {.raises: [Defect, CatchableE
# TODO: Make a custom exception type for this?
raise newException(ValueError, "Future still in progress.")
proc readError*[T](future: Future[T]): ref CatchableError {.raises: [Defect, ValueError].} =
proc readError*[T](future: Future[T]): ref CatchableError {.
raises: [Defect, ValueError].} =
## Retrieves the exception stored in ``future``.
##
## An ``ValueError`` exception will be thrown if no exception exists
@ -576,7 +580,8 @@ proc asyncSpawn*(future: Future[void]) =
cb(nil)
proc asyncCheck*[T](future: Future[T]) {.
deprecated: "Raises Defect on future failure, fix your code and use asyncSpawn!".} =
deprecated: "Raises Defect on future failure, fix your code and use" &
" asyncSpawn!".} =
## This function used to raise an exception through the `poll` call if
## the given future failed - there's no way to handle such exceptions so this
## function is now an alias for `asyncSpawn`

View File

@ -11,7 +11,7 @@
{.push raises: [Defect].}
import std/[os, tables, strutils, heapqueue, lists, options, nativesockets, net,
deques]
deques]
import ./timer
export Port, SocketFlag
@ -584,7 +584,8 @@ elif unixPlatform:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
proc removeReader*(fd: AsyncFD) {.raises: [Defect, IOSelectorsException, ValueError].} =
proc removeReader*(fd: AsyncFD) {.
raises: [Defect, IOSelectorsException, ValueError].} =
## Stop watching the file descriptor ``fd`` for read availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
@ -598,7 +599,8 @@ elif unixPlatform:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.raises: [Defect, IOSelectorsException, ValueError].} =
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
raises: [Defect, IOSelectorsException, ValueError].} =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
let loop = getThreadDispatcher()
@ -614,7 +616,8 @@ elif unixPlatform:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
proc removeWriter*(fd: AsyncFD) {.raises: [Defect, IOSelectorsException, ValueError].} =
proc removeWriter*(fd: AsyncFD) {.
raises: [Defect, IOSelectorsException, ValueError].} =
## Stop watching the file descriptor ``fd`` for write availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
@ -679,7 +682,8 @@ elif unixPlatform:
when ioselSupportedPlatform:
proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int {.raises: [Defect, IOSelectorsException, ValueError, OSError].} =
udata: pointer = nil): int {.
raises: [Defect, IOSelectorsException, ValueError, OSError].} =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
@ -694,7 +698,8 @@ elif unixPlatform:
do:
raise newException(ValueError, "File descriptor not registered.")
proc removeSignal*(sigfd: int) {.raises: [Defect, IOSelectorsException].} =
proc removeSignal*(sigfd: int) {.
raises: [Defect, IOSelectorsException].} =
## Remove watching signal ``signal``.
let loop = getThreadDispatcher()
loop.selector.unregister(sigfd)

View File

@ -7,7 +7,7 @@
# distribution, for details about the copyright.
#
import std/[macros, strutils]
import std/[macros]
proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} =
# Skips a nest of StmtList's.
@ -252,7 +252,8 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
if subtypeIsVoid:
let resultTemplate = quote do:
template result: auto {.used.} =
{.fatal: "You should not reference the `result` variable inside a void async proc".}
{.fatal: "You should not reference the `result` variable inside" &
" a void async proc".}
procBody = newStmtList(resultTemplate, procBody)
# fix #13899, `defer` should not escape its original scope
@ -308,7 +309,8 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
))
# If proc has an explicit gcsafe pragma, we add it to iterator as well.
if prc.pragma.findChild(it.kind in {nnkSym, nnkIdent} and it.strVal == "gcsafe") != nil:
if prc.pragma.findChild(it.kind in {nnkSym, nnkIdent} and
it.strVal == "gcsafe") != nil:
closureIterator.addPragma(newIdentNode("gcsafe"))
outerProcBody.add(closureIterator)

View File

@ -9,10 +9,8 @@
{.push raises: [Defect].}
import
std/[net, nativesockets],
./selectors2,
./asyncloop
import std/[net, nativesockets]
import ./asyncloop
when defined(windows):
import os, winlean
@ -43,33 +41,35 @@ const
proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool =
## Sets blocking mode on socket.
when defined(windows):
result = true
var mode = clong(ord(not blocking))
if ioctlsocket(s, FIONBIO, addr(mode)) == -1:
result = false
else:
result = true
var x: int = fcntl(s, F_GETFL, 0)
if x == -1:
result = false
false
else:
var mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK
true
else:
let x: int = fcntl(s, F_GETFL, 0)
if x == -1:
false
else:
let mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK
if fcntl(s, F_SETFL, mode) == -1:
result = false
false
else:
true
proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool =
## `setsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
var value = cint(optval)
result = setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value))) >= cint(0)
setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value))) >= cint(0)
proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): bool =
## `setsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
result = setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
SockLen(valuelen)) >= cint(0)
setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
SockLen(valuelen)) >= cint(0)
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
## `getsockopt()` for integer options.
@ -79,18 +79,20 @@ proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
if getsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(res), addr(size)) >= cint(0):
value = int(res)
result = true
true
else:
false
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: var int): bool =
## `getsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
result = getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr Socklen](addr valuelen)) >= cint(0)
getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr Socklen](addr valuelen)) >= cint(0)
proc getSocketError*(socket: AsyncFD, err: var int): bool =
## Recover error code associated with socket handle ``socket``.
result = getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err)
getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err)
proc createAsyncSocket*(domain: Domain, sockType: SockType,
protocol: Protocol): AsyncFD {.
@ -107,8 +109,8 @@ proc createAsyncSocket*(domain: Domain, sockType: SockType,
if not setSockOpt(AsyncFD(handle), SOL_SOCKET, SO_NOSIGPIPE, 1):
close(handle)
return asyncInvalidSocket
result = AsyncFD(handle)
register(result)
register(AsyncFD(handle))
AsyncFD(handle)
proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {.
raises: [Defect, CatchableError].} =
@ -121,8 +123,8 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {.
if not setSockOpt(AsyncFD(sock), SOL_SOCKET, SO_NOSIGPIPE, 1):
close(sock)
return asyncInvalidSocket
result = AsyncFD(sock)
register(result)
register(AsyncFD(sock))
AsyncFD(sock)
proc getMaxOpenFiles*(): int {.raises: [Defect, OSError].} =
## Returns maximum file descriptor number that can be opened by this process.
@ -131,12 +133,12 @@ proc getMaxOpenFiles*(): int {.raises: [Defect, OSError].} =
## will return constant value of 16384. You can get more information on this
## link https://docs.microsoft.com/en-us/archive/blogs/markrussinovich/pushing-the-limits-of-windows-handles
when defined(windows):
result = 16384
16384
else:
var limits: RLimit
if getrlimit(posix.RLIMIT_NOFILE, limits) != 0:
raiseOSError(osLastError())
result = int(limits.rlim_cur)
int(limits.rlim_cur)
proc setMaxOpenFiles*(count: int) {.raises: [Defect, OSError].} =
## Set maximum file descriptor number that can be opened by this process.
@ -154,7 +156,8 @@ proc setMaxOpenFiles*(count: int) {.raises: [Defect, OSError].} =
proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
## Create new asynchronouse pipe.
## Returns tuple of read pipe handle and write pipe handle``asyncInvalidPipe`` on error.
## Returns tuple of read pipe handle and write pipe handle``asyncInvalidPipe``
## on error.
when defined(windows):
var pipeIn, pipeOut: Handle
var pipeName: WideCString
@ -175,8 +178,7 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
# If error in {ERROR_ACCESS_DENIED, ERROR_PIPE_BUSY}, then named pipe
# with such name already exists.
if int32(err) != ERROR_ACCESS_DENIED and int32(err) != ERROR_PIPE_BUSY:
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
continue
else:
break
@ -186,39 +188,35 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
FILE_FLAG_OVERLAPPED, 0)
if pipeOut == INVALID_HANDLE_VALUE:
discard closeHandle(pipeIn)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
var ovl = OVERLAPPED()
let res = connectNamedPipe(pipeIn, cast[pointer](addr ovl))
if res == 0:
let err = osLastError()
if int32(err) == ERROR_PIPE_CONNECTED:
case int32(err)
of ERROR_PIPE_CONNECTED:
discard
elif int32(err) == ERROR_IO_PENDING:
of ERROR_IO_PENDING:
var bytesRead = 0.Dword
if getOverlappedResult(pipeIn, addr ovl, bytesRead, 1) == 0:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
else:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
result = (read: AsyncFD(pipeIn), write: AsyncFD(pipeOut))
(read: AsyncFD(pipeIn), write: AsyncFD(pipeOut))
else:
var fds: array[2, cint]
if posix.pipe(fds) == -1:
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
if not(setSocketBlocking(SocketHandle(fds[0]), false)) or
not(setSocketBlocking(SocketHandle(fds[1]), false)):
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
result = (read: AsyncFD(fds[0]), write: AsyncFD(fds[1]))
(read: AsyncFD(fds[0]), write: AsyncFD(fds[1]))

View File

@ -32,8 +32,8 @@ when defined(nimdoc):
## file descriptors.
##
## If the transfer was successful, the number of bytes written to ``outfd``
## is stored in ``count``, and ``0`` returned. Note that a successful call to
## ``sendfile()`` may write fewer bytes than requested; the caller should
## is stored in ``count``, and ``0`` returned. Note that a successful call
## to ``sendfile()`` may write fewer bytes than requested; the caller should
## be prepared to retry the call if there were unsent bytes.
##
## On error, ``-1`` is returned.

View File

@ -363,7 +363,8 @@ proc toSAddr*(address: TransportAddress, sa: var Sockaddr_storage,
else:
discard
proc address*(ta: TransportAddress): IpAddress {.raises: [Defect, ValueError].} =
proc address*(ta: TransportAddress): IpAddress {.
raises: [Defect, ValueError].} =
## Converts ``TransportAddress`` to ``net.IpAddress`` object.
##
## Note its impossible to convert ``TransportAddress`` of ``Unix`` family,

View File

@ -35,7 +35,8 @@ type
writer: Future[void] # Writer vector completion Future
DatagramCallback* = proc(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.gcsafe, raises: [Defect].}
remote: TransportAddress): Future[void] {.
gcsafe, raises: [Defect].}
DatagramTransport* = ref object of RootRef
fd*: AsyncFD # File descriptor
@ -96,7 +97,8 @@ template setReadError(t, e: untyped) =
(t).state.incl(ReadError)
(t).error = getTransportOsError(e)
proc setupDgramTransportTracker(): DgramTransportTracker {.gcsafe, raises: [Defect].}
proc setupDgramTransportTracker(): DgramTransportTracker {.
gcsafe, raises: [Defect].}
proc getDgramTransportTracker(): DgramTransportTracker {.inline.} =
result = cast[DgramTransportTracker](getTracker(DgramTransportTrackerName))

View File

@ -199,8 +199,10 @@ template shiftVectorFile(v, o: untyped) =
(v).buf = cast[pointer](cast[uint]((v).buf) - cast[uint](o))
(v).offset += cast[uint]((o))
proc setupStreamTransportTracker(): StreamTransportTracker {.gcsafe, raises: [Defect].}
proc setupStreamServerTracker(): StreamServerTracker {.gcsafe, raises: [Defect].}
proc setupStreamTransportTracker(): StreamTransportTracker {.
gcsafe, raises: [Defect].}
proc setupStreamServerTracker(): StreamServerTracker {.
gcsafe, raises: [Defect].}
proc getStreamTransportTracker(): StreamTransportTracker {.inline.} =
result = cast[StreamTransportTracker](getTracker(StreamTransportTrackerName))
@ -960,9 +962,12 @@ when defined(windows):
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
server.apending = true
# TODO No way to report back errors!
server.asock = try: createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
except CatchableError as exc: raiseAsDefect exc, "createAsyncSocket"
server.asock =
try:
createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
except CatchableError as exc:
raiseAsDefect exc, "createAsyncSocket"
if server.asock == asyncInvalidSocket:
raiseAssert osErrorMsg(OSErrorCode(wsaGetLastError()))
@ -1039,8 +1044,8 @@ when defined(windows):
let err = OSErrorCode(wsaGetLastError())
server.asock.closeSocket()
if int32(err) == WSAENOTSOCK:
# This can be happened when server get closed, but continuation was
# already scheduled, so we failing it not with OS error.
# This can be happened when server get closed, but continuation
# was already scheduled, so we failing it not with OS error.
retFuture.fail(getServerUseClosedError())
else:
retFuture.fail(getTransportOsError(err))

View File

@ -64,9 +64,9 @@ suite "Asynchronous issues test suite":
await promise
checkstr = checkstr & name
asyncCheck believers("Foo")
asyncCheck believers("Bar")
asyncCheck believers("Baz")
asyncSpawn believers("Foo")
asyncSpawn believers("Bar")
asyncSpawn believers("Baz")
await sleepAsync(100.milliseconds)
promise.complete()

View File

@ -447,7 +447,7 @@ suite "Datagram Transport test suite":
await dgram1.join()
var dgram2 = newDatagramTransport(clientMark)
var data = "MESSAGE"
asyncCheck dgram2.sendTo(localta, data)
asyncSpawn dgram2.sendTo(localta, data)
await sleepAsync(2000.milliseconds)
result = (counter == 0)
dgram2.close()