diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 45aaca357..ef75c86a8 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -53,6 +53,9 @@ proc contains*(c: ConnManager, conn: Connection): bool = if isNil(conn.peerInfo): return + if conn.peerInfo.peerId notin c.conns: + return + return conn in c.conns[conn.peerInfo.peerId] proc contains*(c: ConnManager, peerId: PeerID): bool = @@ -121,7 +124,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = ## triggers the connections resource cleanup ## - await conn.closeEvent.wait() + await conn.join() trace "triggering connection cleanup" await c.cleanupConn(conn) diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 2f05e711f..15938e700 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -71,5 +71,5 @@ proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, - data: string): Future[void] = + data: string): Future[void] {.gcsafe.} = conn.writeMsg(id, msgType, data.toBytes()) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 33b6b9610..3e25a0bc6 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -83,6 +83,7 @@ proc closeMessage(s: LPChannel) {.async.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() ## send close message - this will not raise @@ -99,6 +100,7 @@ proc resetMessage(s: LPChannel) {.async.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() ## send reset message - this will not raise @@ -115,6 +117,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() ## NOTE: Don't call withExcAndLock or withWriteLock, @@ -131,6 +134,7 @@ proc closeRemote*(s: LPChannel) {.async.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() trace "got EOF, closing channel" @@ -163,6 +167,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() if s.closedLocal and s.isEof: @@ -198,6 +203,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = name = s.name oid = $s.oid peer = $s.conn.peerInfo + closed = s.closed # stack = getStackTrace() if s.closedLocal: @@ -211,6 +217,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() + trace "lpchannel closed local" except CancelledError as exc: await s.reset() raise exc @@ -218,8 +225,6 @@ method close*(s: LPChannel) {.async, gcsafe.} = trace "exception closing channel", exc = exc.msg await s.reset() - trace "lpchannel closed local" - s.closedLocal = true asyncCheck closeInternal() diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 317e056a5..e6b519d04 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -96,7 +96,7 @@ proc newStreamInternal*(m: Mplex, proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = ## remove the local channel from the internal tables ## - await chann.closeEvent.wait() + await chann.join() if not isNil(chann): m.getChannelList(chann.initiator).del(chann.id) trace "cleaned up channel", id = chann.id diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 9b6feb772..6c696e4bb 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -54,7 +54,7 @@ method subscribeTopic*(f: FloodSub, method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = ## handle peer disconnects ## - + procCall PubSub(f).handleDisconnect(peer) if not(isNil(peer)) and peer.peerInfo notin f.conns: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 62a747373..01c004ec4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -248,24 +248,27 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## procCall FloodSub(g).handleDisconnect(peer) + if not(isNil(peer)) and peer.peerInfo notin g.conns: + for t in toSeq(g.gossipsub.keys): + g.gossipsub.removePeer(t, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(t).int64, labelValues = [t]) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(t).int64, labelValues = [t]) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(t).int64, labelValues = [t]) + for t in toSeq(g.mesh.keys): + g.mesh.removePeer(t, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(t).int64, labelValues = [t]) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(t).int64, labelValues = [t]) - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(t).int64, labelValues = [t]) + for t in toSeq(g.fanout.keys): + g.fanout.removePeer(t, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(t).int64, labelValues = [t]) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(t).int64, labelValues = [t]) method subscribePeer*(p: GossipSub, conn: Connection) = @@ -434,8 +437,7 @@ method rpcHandler*(g: GossipSub, # forward the message to all peers interested in it let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) - - trace "forwared message to peers", peers = published + trace "forwared message to peers", peers = published, msgs = m.messages var respControl: ControlMessage if m.control.isSome: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 90559b26d..f7bbaee06 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -63,7 +63,7 @@ type method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects ## - + if not(isNil(peer)) and peer.peerInfo notin p.conns: trace "deleting peer", peer = peer.id peer.onConnect.fire() # Make sure all pending sends are unblocked @@ -76,7 +76,8 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = proc onConnClose(p: PubSub, conn: Connection) {.async.} = try: let peer = conn.peerInfo - await conn.closeEvent.wait() + await conn.join() + trace "triggering onConnClose cleanup", peer = $conn if peer in p.conns: p.conns[peer].excl(conn) @@ -115,7 +116,7 @@ method rpcHandler*(p: PubSub, trace "processing messages", msg = m.shortLog if m.subscriptions.len > 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic - trace "about to subscribe to topic", topicId = s.topic + trace "about to subscribe to topic", topicId = s.topic, peer = $peer await p.subscribeTopic(s.topic, s.subscribe, peer.id) proc getOrCreatePeer(p: PubSub, @@ -282,10 +283,13 @@ proc publishHelper*(p: PubSub, let f = sent.filterIt(it.fut == s) if f.len > 0: if s.failed: - trace "sending messages to peer failed", peer = f[0].id + trace "sending messages to peer failed", peer = f[0].id, + exc = f[0].fut.readError().msg, + msgs = msgs failed.add(f[0].id) else: - trace "sending messages to peer succeeded", peer = f[0].id + trace "sending messages to peer succeeded", peer = f[0].id, + msgs = msgs published.add(f[0].id) for f in failed: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index c215c3828..729f87d08 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -180,7 +180,8 @@ proc send*( let sendFut = sendToRemote() try: - await sendFut.wait(timeout) + # await sendFut.wait(timeout) + await sendFut except CatchableError as exc: trace "unable to send to remote", exc = exc.msg if not sendFut.finished: diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index aeff8fa50..eacd5d691 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -60,7 +60,7 @@ proc handleConn*(s: Secure, initiator: bool): Future[Connection] {.async, gcsafe.} = var sconn = await s.handshake(conn, initiator) if not isNil(sconn): - conn.closeEvent.wait() + conn.join() .addCallback do(udata: pointer = nil): asyncCheck sconn.close() diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index a14bf2dc3..9db1887ee 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -38,7 +38,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey), transportFlags: set[ServerFlags] = {}, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, rng = newRng(), - inTimeout: Duration = 5.minutes, + inTimeout: Duration = 1.minutes, outTimeout: Duration = 5.minutes): Switch = proc createMplex(conn: Connection): Muxer = Mplex.init( diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 0b996e7fa..8779377de 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -101,7 +101,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = try: - await conn.closeEvent.wait() + await conn.join() trace "about to cleanup pubsub peer" if s.pubSub.isSome: let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId) @@ -305,7 +305,7 @@ proc internalConnect(s: Switch, # make sure to assign the peer to the connection conn.peerInfo = PeerInfo.init(peerId, addrs) - conn.closeEvent.wait() + conn.join() .addCallback do(udata: pointer): asyncCheck s.triggerHooks( conn.peerInfo, @@ -326,7 +326,10 @@ proc internalConnect(s: Switch, s.connManager.storeOutgoing(uconn) asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded) conn = uconn - trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo + trace "dial successful", oid = $conn.oid, + upgraddedOid = $uconn.oid, + peer = $conn.peerInfo + except CatchableError as exc: if not(isNil(conn)): await conn.close() @@ -355,7 +358,6 @@ proc internalConnect(s: Switch, peer = shortLog(conn.peerInfo) asyncCheck s.cleanupPubSubPeer(conn) - asyncCheck s.subscribePeer(peerId) trace "got connection", oid = $conn.oid, direction = $conn.dir, @@ -418,7 +420,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: - conn.closeEvent.wait() + conn.join() .addCallback do(udata: pointer): asyncCheck s.triggerHooks( conn.peerInfo, @@ -489,7 +491,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} = return s.pubSub.get().subscribePeer(stream) - await stream.closeEvent.wait() + await stream.join() except CancelledError as exc: if not(isNil(stream)): await stream.close() @@ -498,6 +500,7 @@ proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} = except CatchableError as exc: trace "exception in subscribe to peer", peer = peerId, exc = exc.msg + finally: if not(isNil(stream)): await stream.close() @@ -510,24 +513,25 @@ proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} = try: trace "subscribing to pubsub peer", peer = peerId await s.subscribePeerInternal(peerId) + + trace "sleeping before trying pubsub peer", peer = peerId except CancelledError as exc: raise exc except CatchableError as exc: trace "exception in pubsub monitor", peer = peerId, exc = exc.msg - finally: - trace "sleeping before trying pubsub peer", peer = peerId - await sleepAsync(1.seconds) # allow the peer to cooldown + + await sleepAsync(1.seconds) # allow the peer to cooldown trace "exiting pubsub monitor", peer = peerId -proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.gcsafe.} = +proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = ## Waits until ``server`` is not closed. ## var retFuture = newFuture[void]("stream.transport.server.join") let pubsubFut = s.pubsubMonitors.mgetOrPut( - peerInfo.peerId, - s.pubsubMonitor(peerInfo)) + peerId, + s.pubsubMonitor(peerId)) proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() @@ -630,7 +634,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # try establishing a pubsub connection asyncCheck s.cleanupPubSubPeer(muxer.connection) - asyncCheck s.subscribePeer(muxer.connection.peerInfo.peerId) except CancelledError as exc: await muxer.close() diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index 568bc9187..d03d1a44d 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -33,7 +33,7 @@ suite "Connection Manager": connMngr.storeConn(conn) check conn in connMngr - let peerConn = connMngr.selectConn(peer) + let peerConn = connMngr.selectConn(peer.peerID) check peerConn == conn check peerConn.dir == Direction.In @@ -62,8 +62,8 @@ suite "Connection Manager": check conn1 in connMngr check conn2 in connMngr - let outConn = connMngr.selectConn(peer, Direction.Out) - let inConn = connMngr.selectConn(peer, Direction.In) + let outConn = connMngr.selectConn(peer.peerId, Direction.Out) + let inConn = connMngr.selectConn(peer.peerId, Direction.In) check outConn != inConn check outConn.dir == Direction.Out @@ -83,7 +83,7 @@ suite "Connection Manager": connMngr.storeMuxer(muxer) check muxer in connMngr - let stream = await connMngr.getMuxedStream(peer) + let stream = await connMngr.getMuxedStream(peer.peerId) check not(isNil(stream)) check stream.peerInfo == peer @@ -103,8 +103,8 @@ suite "Connection Manager": connMngr.storeMuxer(muxer) check muxer in connMngr - check not(isNil((await connMngr.getMuxedStream(peer, Direction.In)))) - check isNil((await connMngr.getMuxedStream(peer, Direction.Out))) + check not(isNil((await connMngr.getMuxedStream(peer.peerId, Direction.In)))) + check isNil((await connMngr.getMuxedStream(peer.peerId, Direction.Out))) waitFor(test()) @@ -179,14 +179,13 @@ suite "Connection Manager": check conn in connMngr check muxer in connMngr - check not(isNil(connMngr.selectConn(peer, dir))) + check not(isNil(connMngr.selectConn(peer.peerId, dir))) - check peer in connMngr.peers - await connMngr.dropPeer(peer) + check peer.peerId in connMngr + await connMngr.dropPeer(peer.peerId) - check peer notin connMngr.peers - check isNil(connMngr.selectConn(peer, Direction.In)) - check isNil(connMngr.selectConn(peer, Direction.Out)) - check connMngr.peers.len == 0 + check peer.peerId notin connMngr + check isNil(connMngr.selectConn(peer.peerId, Direction.In)) + check isNil(connMngr.selectConn(peer.peerId, Direction.Out)) waitFor(test()) diff --git a/tests/testnative.nim b/tests/testnative.nim index 9f75ffec6..64091a6b6 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -15,6 +15,7 @@ import testmultibase, testpeer import testtransport, + testconnmngr, testmultistream, testbufferstream, testidentify, diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 89fe5c481..1095e24a8 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -114,14 +114,14 @@ suite "Switch": await sleepAsync(2.seconds) # wait a little for cleanup to happen var bufferTracker = getTracker(BufferStreamTrackerName) - # echo bufferTracker.dump() + echo bufferTracker.dump() # plus 4 for the pubsub streams check (BufferStreamTracker(bufferTracker).opened == (BufferStreamTracker(bufferTracker).closed + 4.uint64)) var connTracker = getTracker(ConnectionTrackerName) - # echo connTracker.dump() + echo connTracker.dump() # plus 8 is for the secured connection and the socket # and the pubsub streams that won't clean up until