move `allFutureThrowing` helper to tests (#1037)
Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
parent
d08ce17144
commit
6c873481ac
|
@ -45,28 +45,6 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
|
||||||
debug "A future has failed, enable trace logging for details", error=exc.name
|
debug "A future has failed, enable trace logging for details", error=exc.name
|
||||||
trace "Exception details", msg=exc.msg
|
trace "Exception details", msg=exc.msg
|
||||||
|
|
||||||
proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] =
|
|
||||||
var futs: seq[F]
|
|
||||||
for fut in args:
|
|
||||||
futs &= fut
|
|
||||||
proc call() {.async.} =
|
|
||||||
var first: ref CatchableError = nil
|
|
||||||
futs = await allFinished(futs)
|
|
||||||
for fut in futs:
|
|
||||||
if fut.failed:
|
|
||||||
let err = fut.readError()
|
|
||||||
if err of Defect:
|
|
||||||
raise err
|
|
||||||
else:
|
|
||||||
if err of CancelledError:
|
|
||||||
raise err
|
|
||||||
if isNil(first):
|
|
||||||
first = err
|
|
||||||
if not isNil(first):
|
|
||||||
raise first
|
|
||||||
|
|
||||||
return call()
|
|
||||||
|
|
||||||
template tryAndWarn*(message: static[string]; body: untyped): untyped =
|
template tryAndWarn*(message: static[string]; body: untyped): untyped =
|
||||||
try:
|
try:
|
||||||
body
|
body
|
||||||
|
|
|
@ -91,21 +91,33 @@ proc handleConn(s: Secure,
|
||||||
# we require this information in for example gossipsub
|
# we require this information in for example gossipsub
|
||||||
sconn.transportDir = if initiator: Direction.Out else: Direction.In
|
sconn.transportDir = if initiator: Direction.Out else: Direction.In
|
||||||
|
|
||||||
proc cleanup() {.async.} =
|
proc cleanup() {.async: (raises: []).} =
|
||||||
try:
|
try:
|
||||||
let futs = [conn.join(), sconn.join()]
|
block:
|
||||||
await futs[0] or futs[1]
|
let
|
||||||
for f in futs:
|
fut1 = conn.join()
|
||||||
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
fut2 = sconn.join()
|
||||||
|
await fut1 or fut2 # one join() completes, cancel outstanding join()
|
||||||
|
if not fut1.finished: await fut1.cancelAndWait()
|
||||||
|
if not fut2.finished: await fut2.cancelAndWait()
|
||||||
|
block:
|
||||||
|
let
|
||||||
|
fut1 = sconn.close()
|
||||||
|
fut2 = conn.close()
|
||||||
|
await allFutures(fut1, fut2)
|
||||||
|
if fut1.failed:
|
||||||
|
let err = fut1.error()
|
||||||
|
if not (err of CancelledError):
|
||||||
|
debug "error cleaning up secure connection", err = err.msg, sconn
|
||||||
|
if fut2.failed:
|
||||||
|
let err = fut2.error()
|
||||||
|
if not (err of CancelledError):
|
||||||
|
debug "error cleaning up secure connection", err = err.msg, sconn
|
||||||
|
|
||||||
await allFuturesThrowing(
|
|
||||||
sconn.close(), conn.close())
|
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
# This is top-level procedure which will work as separate task, so it
|
# This is top-level procedure which will work as separate task, so it
|
||||||
# do not need to propagate CancelledError.
|
# do not need to propagate CancelledError.
|
||||||
discard
|
discard
|
||||||
except CatchableError as exc:
|
|
||||||
debug "error cleaning up secure connection", err = exc.msg, sconn
|
|
||||||
|
|
||||||
if not isNil(sconn):
|
if not isNil(sconn):
|
||||||
# All the errors are handled inside `cleanup()` procedure.
|
# All the errors are handled inside `cleanup()` procedure.
|
||||||
|
|
|
@ -160,7 +160,9 @@ method initStream*(s: LPStream) {.base.} =
|
||||||
inc getStreamTracker(s.objName).opened
|
inc getStreamTracker(s.objName).opened
|
||||||
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
||||||
|
|
||||||
proc join*(s: LPStream): Future[void] {.public.} =
|
proc join*(
|
||||||
|
s: LPStream
|
||||||
|
): Future[void] {.async: (raises: [CancelledError], raw: true), public.} =
|
||||||
## Wait for the stream to be closed
|
## Wait for the stream to be closed
|
||||||
s.closeEvent.wait()
|
s.closeEvent.wait()
|
||||||
|
|
||||||
|
|
|
@ -90,24 +90,35 @@ proc connHandler*(self: TcpTransport,
|
||||||
timeout = self.connectionsTimeout
|
timeout = self.connectionsTimeout
|
||||||
))
|
))
|
||||||
|
|
||||||
proc onClose() {.async.} =
|
proc onClose() {.async: (raises: []).} =
|
||||||
try:
|
try:
|
||||||
let futs = @[client.join(), conn.join()]
|
block:
|
||||||
await futs[0] or futs[1]
|
let
|
||||||
for f in futs:
|
fut1 = client.join()
|
||||||
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
fut2 = conn.join()
|
||||||
|
await fut1 or fut2 # one join() completes, cancel outstanding join()
|
||||||
|
if not fut1.finished: await fut1.cancelAndWait()
|
||||||
|
if not fut2.finished: await fut2.cancelAndWait()
|
||||||
|
|
||||||
trace "Cleaning up client", addrs = $client.remoteAddress,
|
trace "Cleaning up client", addrs = $client.remoteAddress,
|
||||||
conn
|
conn
|
||||||
|
|
||||||
self.clients[dir].keepItIf( it != client )
|
self.clients[dir].keepItIf( it != client )
|
||||||
await allFuturesThrowing(
|
|
||||||
conn.close(), client.closeWait())
|
block:
|
||||||
|
let
|
||||||
|
fut1 = conn.close()
|
||||||
|
fut2 = client.closeWait()
|
||||||
|
await allFutures(fut1, fut2)
|
||||||
|
if fut1.failed:
|
||||||
|
let err = fut1.error()
|
||||||
|
debug "Error cleaning up client", errMsg = err.msg, conn
|
||||||
|
static: doAssert typeof(fut2).E is void # Cannot fail
|
||||||
|
|
||||||
trace "Cleaned up client", addrs = $client.remoteAddress,
|
trace "Cleaned up client", addrs = $client.remoteAddress,
|
||||||
conn
|
conn
|
||||||
|
|
||||||
except CatchableError as exc:
|
except CancelledError as exc:
|
||||||
let useExc {.used.} = exc
|
let useExc {.used.} = exc
|
||||||
debug "Error cleaning up client", errMsg = exc.msg, conn
|
debug "Error cleaning up client", errMsg = exc.msg, conn
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] =
|
||||||
|
# This proc is only meant for use in tests / not suitable for general use.
|
||||||
|
# - Swallowing errors arbitrarily instead of aggregating them is bad design
|
||||||
|
# - It raises `CatchableError` instead of the union of the `futs` errors,
|
||||||
|
# inflating the caller's `raises` list unnecessarily. `macro` could fix it
|
||||||
|
var futs: seq[F]
|
||||||
|
for fut in args:
|
||||||
|
futs &= fut
|
||||||
|
proc call() {.async.} =
|
||||||
|
var first: ref CatchableError = nil
|
||||||
|
futs = await allFinished(futs)
|
||||||
|
for fut in futs:
|
||||||
|
if fut.failed:
|
||||||
|
let err = fut.readError()
|
||||||
|
if err of Defect:
|
||||||
|
raise err
|
||||||
|
else:
|
||||||
|
if err of CancelledError:
|
||||||
|
raise err
|
||||||
|
if isNil(first):
|
||||||
|
first = err
|
||||||
|
if not isNil(first):
|
||||||
|
raise first
|
||||||
|
|
||||||
|
return call()
|
|
@ -14,8 +14,8 @@ import ../libp2p/protocols/secure/secure
|
||||||
import ../libp2p/switch
|
import ../libp2p/switch
|
||||||
import ../libp2p/nameresolving/[nameresolver, mockresolver]
|
import ../libp2p/nameresolving/[nameresolver, mockresolver]
|
||||||
|
|
||||||
import ./asyncunit
|
import "."/[asyncunit, errorhelpers]
|
||||||
export asyncunit, mockresolver
|
export asyncunit, errorhelpers, mockresolver
|
||||||
|
|
||||||
const
|
const
|
||||||
StreamTransportTrackerName = "stream.transport"
|
StreamTransportTrackerName = "stream.transport"
|
||||||
|
|
|
@ -12,6 +12,7 @@ import ../../libp2p/[builders,
|
||||||
protocols/connectivity/autonat/service,
|
protocols/connectivity/autonat/service,
|
||||||
protocols/ping]
|
protocols/ping]
|
||||||
import ../stubs/autonatclientstub
|
import ../stubs/autonatclientstub
|
||||||
|
import ../errorhelpers
|
||||||
|
|
||||||
proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
|
proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
|
||||||
let rng = newRng()
|
let rng = newRng()
|
||||||
|
|
Loading…
Reference in New Issue