diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 0292641..a7cf0ea 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -48,6 +48,8 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = if err of Defect: raise err else: + if err of CancelledError: + raise err if isNil(first): first = err if not isNil(first): diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 6164aa9..2f05e71 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -71,9 +71,5 @@ proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, - data: string) {.async, gcsafe.} = - # TODO: changing this to - #`await conn.writeMsg(id, msgType, data.toBytes())` - # causes all sorts of race conditions and hangs. - # DON'T DO IT! - result = conn.writeMsg(id, msgType, data.toBytes()) + data: string): Future[void] = + conn.writeMsg(id, msgType, data.toBytes()) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 33b6b96..51d2613 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -69,6 +69,8 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped = template withEOFExceptions(body: untyped): untyped = try: body + except CancelledError as exc: + raise exc except LPStreamEOFError as exc: trace "muxed connection EOF", exc = exc.msg except LPStreamClosedError as exc: diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index e48b520..317e056 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -115,9 +115,9 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = trace "finished handling stream" doAssert(chann.closed, "connection not closed by handler!") except CancelledError as exc: - trace "cancling stream handler", exc = exc.msg + trace "cancelling stream handler", exc = exc.msg await chann.reset() - raise + raise exc except CatchableError as exc: trace "exception in stream handler", exc = exc.msg await chann.reset() diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 7e28d2c..e5bd542 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -93,6 +93,8 @@ method rpcHandler*(f: FloodSub, try: await h(t, msg.data) # trigger user provided handler + except CancelledError as exc: + raise exc except CatchableError as exc: trace "exception in message handler", exc = exc.msg diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 8871bc3..f0418d2 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -426,7 +426,9 @@ method rpcHandler*(g: GossipSub, localPeer = g.peerInfo.id, fromPeer = msg.fromPeer.pretty try: - await h(t, msg.data) # trigger user provided handler + await h(t, msg.data) # trigger user provided handler + except CancelledError as exc: + raise exc except CatchableError as exc: trace "exception in message handler", exc = exc.msg diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index b536c9a..366bc44 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -41,6 +41,8 @@ proc init*(C: type ChronosStream, template withExceptions(body: untyped) = try: body + except CancelledError as exc: + raise exc except TransportIncompleteError: # for all intents and purposes this is an EOF raise newLPStreamEOFError() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 30cc756..582fbde 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -523,11 +523,28 @@ proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} = trace "exiting pubsub monitor", peer = $peer -proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = - if peerInfo.peerId notin s.pubsubMonitors: - s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo) +proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.gcsafe.} = + ## Waits until ``server`` is not closed. + ## - result = s.pubsubMonitors.getOrDefault(peerInfo.peerId) + var retFuture = newFuture[void]("stream.transport.server.join") + let pubsubFut = s.pubsubMonitors.mgetOrPut( + peerInfo.peerId, + s.pubsubMonitor(peerInfo)) + + proc continuation(udata: pointer) {.gcsafe.} = + retFuture.complete() + + proc cancel(udata: pointer) {.gcsafe.} = + pubsubFut.removeCallback(continuation, cast[pointer](retFuture)) + + if not(pubsubFut.finished()): + pubsubFut.addCallback(continuation, cast[pointer](retFuture)) + retFuture.cancelCallback = cancel + else: + retFuture.complete() + + return retFuture proc subscribe*(s: Switch, topic: string, handler: TopicHandler) {.async.} = diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 9ddffcd..4d0963b 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -76,6 +76,8 @@ proc connHandler*(t: TcpTransport, if not(isNil(conn)): await conn.close() t.clients.keepItIf(it != client) + except CancelledError as exc: + raise exc except CatchableError as exc: trace "error cleaning up client", exc = exc.msg @@ -139,6 +141,8 @@ method close*(t: TcpTransport) {.async, gcsafe.} = trace "transport stopped" inc getTcpTransportTracker().closed + except CancelledError as exc: + raise exc except CatchableError as exc: trace "error shutting down tcp transport", exc = exc.msg diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 3f84794..fbcf843 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -135,7 +135,7 @@ suite "Mplex": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = - result = nil + discard ) chann = LPChannel.init(1, conn, true)