From 6c873481acb6f071ab2ab0dbb60b894505bccfc6 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Fri, 1 Mar 2024 18:06:26 +0100 Subject: [PATCH] move `allFutureThrowing` helper to tests (#1037) Co-authored-by: Jacek Sieka --- libp2p/errors.nim | 22 -------------- libp2p/protocols/secure/secure.nim | 30 +++++++++++++------ libp2p/stream/lpstream.nim | 4 ++- libp2p/transports/tcptransport.nim | 27 ++++++++++++----- tests/errorhelpers.nim | 27 +++++++++++++++++ tests/helpers.nim | 4 +-- tests/hole-punching-interop/hole_punching.nim | 1 + 7 files changed, 73 insertions(+), 42 deletions(-) create mode 100644 tests/errorhelpers.nim diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 4f52ce1c1..ea47e12e0 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -45,28 +45,6 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped = debug "A future has failed, enable trace logging for details", error=exc.name trace "Exception details", msg=exc.msg -proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = - var futs: seq[F] - for fut in args: - futs &= fut - proc call() {.async.} = - var first: ref CatchableError = nil - futs = await allFinished(futs) - for fut in futs: - if fut.failed: - let err = fut.readError() - if err of Defect: - raise err - else: - if err of CancelledError: - raise err - if isNil(first): - first = err - if not isNil(first): - raise first - - return call() - template tryAndWarn*(message: static[string]; body: untyped): untyped = try: body diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 9f1cad36e..0f820b002 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -91,21 +91,33 @@ proc handleConn(s: Secure, # we require this information in for example gossipsub sconn.transportDir = if initiator: Direction.Out else: Direction.In - proc cleanup() {.async.} = + proc cleanup() {.async: (raises: []).} = try: - let futs = [conn.join(), sconn.join()] - await futs[0] or futs[1] - for f in futs: - if not f.finished: await f.cancelAndWait() # cancel outstanding join() + block: + let + fut1 = conn.join() + fut2 = sconn.join() + await fut1 or fut2 # one join() completes, cancel outstanding join() + if not fut1.finished: await fut1.cancelAndWait() + if not fut2.finished: await fut2.cancelAndWait() + block: + let + fut1 = sconn.close() + fut2 = conn.close() + await allFutures(fut1, fut2) + if fut1.failed: + let err = fut1.error() + if not (err of CancelledError): + debug "error cleaning up secure connection", err = err.msg, sconn + if fut2.failed: + let err = fut2.error() + if not (err of CancelledError): + debug "error cleaning up secure connection", err = err.msg, sconn - await allFuturesThrowing( - sconn.close(), conn.close()) except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propagate CancelledError. discard - except CatchableError as exc: - debug "error cleaning up secure connection", err = exc.msg, sconn if not isNil(sconn): # All the errors are handled inside `cleanup()` procedure. diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index d31737338..5701b1693 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -160,7 +160,9 @@ method initStream*(s: LPStream) {.base.} = inc getStreamTracker(s.objName).opened trace "Stream created", s, objName = s.objName, dir = $s.dir -proc join*(s: LPStream): Future[void] {.public.} = +proc join*( + s: LPStream +): Future[void] {.async: (raises: [CancelledError], raw: true), public.} = ## Wait for the stream to be closed s.closeEvent.wait() diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index a2779be76..cc60f3f94 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -90,24 +90,35 @@ proc connHandler*(self: TcpTransport, timeout = self.connectionsTimeout )) - proc onClose() {.async.} = + proc onClose() {.async: (raises: []).} = try: - let futs = @[client.join(), conn.join()] - await futs[0] or futs[1] - for f in futs: - if not f.finished: await f.cancelAndWait() # cancel outstanding join() + block: + let + fut1 = client.join() + fut2 = conn.join() + await fut1 or fut2 # one join() completes, cancel outstanding join() + if not fut1.finished: await fut1.cancelAndWait() + if not fut2.finished: await fut2.cancelAndWait() trace "Cleaning up client", addrs = $client.remoteAddress, conn self.clients[dir].keepItIf( it != client ) - await allFuturesThrowing( - conn.close(), client.closeWait()) + + block: + let + fut1 = conn.close() + fut2 = client.closeWait() + await allFutures(fut1, fut2) + if fut1.failed: + let err = fut1.error() + debug "Error cleaning up client", errMsg = err.msg, conn + static: doAssert typeof(fut2).E is void # Cannot fail trace "Cleaned up client", addrs = $client.remoteAddress, conn - except CatchableError as exc: + except CancelledError as exc: let useExc {.used.} = exc debug "Error cleaning up client", errMsg = exc.msg, conn diff --git a/tests/errorhelpers.nim b/tests/errorhelpers.nim new file mode 100644 index 000000000..d7cea7c8f --- /dev/null +++ b/tests/errorhelpers.nim @@ -0,0 +1,27 @@ +import chronos + +proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = + # This proc is only meant for use in tests / not suitable for general use. + # - Swallowing errors arbitrarily instead of aggregating them is bad design + # - It raises `CatchableError` instead of the union of the `futs` errors, + # inflating the caller's `raises` list unnecessarily. `macro` could fix it + var futs: seq[F] + for fut in args: + futs &= fut + proc call() {.async.} = + var first: ref CatchableError = nil + futs = await allFinished(futs) + for fut in futs: + if fut.failed: + let err = fut.readError() + if err of Defect: + raise err + else: + if err of CancelledError: + raise err + if isNil(first): + first = err + if not isNil(first): + raise first + + return call() diff --git a/tests/helpers.nim b/tests/helpers.nim index ad4404af2..f18a720b7 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -14,8 +14,8 @@ import ../libp2p/protocols/secure/secure import ../libp2p/switch import ../libp2p/nameresolving/[nameresolver, mockresolver] -import ./asyncunit -export asyncunit, mockresolver +import "."/[asyncunit, errorhelpers] +export asyncunit, errorhelpers, mockresolver const StreamTransportTrackerName = "stream.transport" diff --git a/tests/hole-punching-interop/hole_punching.nim b/tests/hole-punching-interop/hole_punching.nim index 390c807e7..eea281289 100644 --- a/tests/hole-punching-interop/hole_punching.nim +++ b/tests/hole-punching-interop/hole_punching.nim @@ -12,6 +12,7 @@ import ../../libp2p/[builders, protocols/connectivity/autonat/service, protocols/ping] import ../stubs/autonatclientstub +import ../errorhelpers proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch = let rng = newRng()