diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 1c4aa8e52..74f0b302c 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -174,12 +174,12 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = peer = $s.conn.peerInfo # stack = getStackTrace() - trace "resetting channel" - if s.closedLocal and s.isEof: trace "channel already closed or reset" return + trace "resetting channel" + # we asyncCheck here because the other end # might be dead already - reset is always # optimistic @@ -227,7 +227,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.reset() raise exc except CatchableError as exc: - trace "exception closing channel" + trace "exception closing channel", exc = exc.msg await s.reset() trace "lpchannel closed local" @@ -244,6 +244,13 @@ proc timeoutMonitor(s: LPChannel) {.async.} = ## be reset ## + logScope: + id = s.id + initiator = s.initiator + name = s.name + oid = $s.oid + peer = $s.conn.peerInfo + try: while true: await sleepAsync(s.timeout) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e3d6ed25c..94c130273 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -39,7 +39,7 @@ const GossipSubHistoryGossip* = 3 # heartbeat interval const GossipSubHeartbeatInitialDelay* = 100.millis -const GossipSubHeartbeatInterval* = 5.seconds # TODO: per the spec it should be 1 second +const GossipSubHeartbeatInterval* = 1.seconds # fanout ttl const GossipSubFanoutTTL* = 1.minutes diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index c4d17fe49..251a091c4 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -242,17 +242,19 @@ method unsubscribe*(p: PubSub, if p.topics[t.topic].handler.len <= 0: p.topics.del(t.topic) # metrics - libp2p_pubsub_topics.dec() + libp2p_pubsub_topics.set(p.topics.len.int64) proc unsubscribe*(p: PubSub, - topic: string, - handler: TopicHandler): Future[void] = + topic: string, + handler: TopicHandler): Future[void] = ## unsubscribe from a ``topic`` string + ## + p.unsubscribe(@[(topic, handler)]) method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} = - libp2p_pubsub_topics.dec() p.topics.del(topic) + libp2p_pubsub_topics.set(p.topics.len.int64) method subscribe*(p: PubSub, topic: string, @@ -278,7 +280,7 @@ method subscribe*(p: PubSub, checkFutures(await allFinished(sent)) # metrics - libp2p_pubsub_topics.inc() + libp2p_pubsub_topics.set(p.topics.len.int64) proc sendHelper*(p: PubSub, sendPeers: HashSet[PubSubPeer], diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index 619c7d452..a14bf2dc3 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -38,8 +38,8 @@ proc newStandardSwitch*(privKey = none(PrivateKey), transportFlags: set[ServerFlags] = {}, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, rng = newRng(), - inTimeout: Duration = 1.minutes, - outTimeout: Duration = 1.minutes): Switch = + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes): Switch = proc createMplex(conn: Connection): Muxer = Mplex.init( conn, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index eca9691e8..ccd671d98 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -44,6 +44,9 @@ declareCounter(libp2p_dialed_peers, "dialed peers") declareCounter(libp2p_failed_dials, "failed dials") declareCounter(libp2p_failed_upgrade, "peers failed upgrade") +const + MaxPubsubReconnectAttempts* = 10 + type NoPubSubException* = object of CatchableError @@ -67,6 +70,7 @@ type pubSub*: Option[PubSub] dialLock: Table[string, AsyncLock] hooks: Table[Lifecycle, HashSet[Hook]] + pubsubMonitors: Table[PeerId, Future[void]] proc newNoPubSubException(): ref NoPubSubException {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") @@ -95,9 +99,18 @@ proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = - await conn.closeEvent.wait() - if s.pubSub.isSome: - await s.pubSub.get().unsubscribePeer(conn.peerInfo) + try: + await conn.closeEvent.wait() + if s.pubSub.isSome: + let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId) + if not(isNil(fut)) and not(fut.finished): + await fut.cancelAndWait() + + await s.pubSub.get().unsubscribePeer(conn.peerInfo) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception cleaning pubsub peer", exc = exc.msg proc isConnected*(s: Switch, peer: PeerInfo): bool = ## returns true if the peer has one or more @@ -322,7 +335,9 @@ proc internalConnect(s: Switch, continue break else: - trace "Reusing existing connection", oid = $conn.oid, direction = conn.dir + trace "Reusing existing connection", oid = $conn.oid, + direction = $conn.dir, + peer = $conn.peerInfo finally: if lock.locked(): lock.release() @@ -344,6 +359,9 @@ proc internalConnect(s: Switch, await s.subscribePeer(peer) asyncCheck s.cleanupPubSubPeer(conn) + trace "got connection", oid = $conn.oid, + direction = $conn.dir, + peer = $conn.peerInfo return conn proc connect*(s: Switch, peer: PeerInfo) {.async.} = @@ -379,7 +397,7 @@ proc dial*(s: Switch, await cleanup() raise exc except CatchableError as exc: - trace "error dialing" + trace "error dialing", exc = exc.msg await cleanup() raise exc @@ -399,7 +417,6 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: - conn.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.triggerHooks( @@ -451,7 +468,7 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" -proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = +proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() @@ -468,7 +485,7 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = return s.pubSub.get().subscribePeer(stream) - + await stream.closeEvent.wait() except CancelledError as exc: if not(isNil(stream)): await stream.close() @@ -480,6 +497,35 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = if not(isNil(stream)): await stream.close() +proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} = + ## while peer connected maintain a + ## pubsub connection as well + ## + + var tries = 0 + var backoffFactor = 5 # up to ~10 mins + var backoff = 1.seconds + while switch.isConnected(peer) and + tries < MaxPubsubReconnectAttempts: + try: + debug "subscribing to pubsub peer", peer = $peer + await switch.subscribePeerInternal(peer) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in pubsub monitor", peer = $peer, exc = exc.msg + finally: + debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries + await sleepAsync(backoff) # allow the peer to cooldown + backoff = backoff * backoffFactor + tries.inc() + + trace "exiting pubsub monitor", peer = $peer + +proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = + if peerInfo.peerId notin s.pubsubMonitors: + s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo) + proc subscribe*(s: Switch, topic: string, handler: TopicHandler) {.async.} = ## subscribe to a pubsub topic diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 78d319d35..b8bff2713 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -54,7 +54,7 @@ suite "FloodSub": nodes[1].start() ) - await subscribeNodes(nodes) + let subscribes = await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") @@ -69,6 +69,7 @@ suite "FloodSub": ) await allFuturesThrowing(nodesFut.concat()) + await allFuturesThrowing(subscribes) check: waitFor(runTests()) == true @@ -85,7 +86,7 @@ suite "FloodSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) + let subscribes = await subscribeNodes(nodes) await nodes[0].subscribe("foobar", handler) await waitSub(nodes[1], nodes[0], "foobar") @@ -95,6 +96,8 @@ suite "FloodSub": result = await completionFut.wait(5.seconds) await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaiters) check: @@ -112,7 +115,7 @@ suite "FloodSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) + let subscribes = await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") @@ -131,6 +134,8 @@ suite "FloodSub": await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaiters) result = true @@ -147,7 +152,7 @@ suite "FloodSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) + let subscribes = await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") @@ -164,6 +169,8 @@ suite "FloodSub": await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaiters) result = true @@ -182,7 +189,7 @@ suite "FloodSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) + let subscribes = await subscribeNodes(nodes) await nodes[1].subscribe("foo", handler) await waitSub(nodes[0], nodes[1], "foo") await nodes[1].subscribe("bar", handler) @@ -203,6 +210,8 @@ suite "FloodSub": await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaiters) result = true @@ -237,7 +246,7 @@ suite "FloodSub": for i in 0.. 0 await allFuturesThrowing(nodes.mapIt(it.stop())) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaitters) result = true @@ -437,7 +454,7 @@ suite "GossipSub": secureManagers = [SecureProtocol.Secio]) awaitters.add((await nodes[i].start())) - await subscribeSparseNodes(nodes) + let subscribes = await subscribeSparseNodes(nodes, 1) var seen: Table[string, int] var subs: seq[Future[void]] @@ -468,7 +485,7 @@ suite "GossipSub": check: seen.len >= runs for k, v in seen.pairs: check: v >= 1 - + for node in nodes: var gossip: GossipSub = GossipSub(node.pubSub.get()) check: @@ -477,6 +494,8 @@ suite "GossipSub": gossip.mesh["foobar"].len > 0 await allFuturesThrowing(nodes.mapIt(it.stop())) + + await allFuturesThrowing(subscribes) await allFuturesThrowing(awaitters) result = true diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 337364cc8..d24447654 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -9,36 +9,33 @@ proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = for i in 0..= count and not finished: finished = true - await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses)) + let peer = NativePeerInfo.init( + daemonPeer.peer, + daemonPeer.addresses) + await nativeNode.connect(peer) + let subscribeHanle = nativeNode.subscribePeer(peer) + await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) @@ -113,6 +117,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() + await subscribeHanle proc testPubSubNodePublish(gossip: bool = false, count: int = 1): Future[bool] {.async.} = @@ -134,8 +139,11 @@ proc testPubSubNodePublish(gossip: bool = false, let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo - await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses)) + let peer = NativePeerInfo.init( + daemonPeer.peer, + daemonPeer.addresses) + await nativeNode.connect(peer) + let subscribeHandle = nativeNode.subscribePeer(peer) await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) @@ -168,6 +176,7 @@ proc testPubSubNodePublish(gossip: bool = false, await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() + await subscribeHandle suite "Interop": # TODO: chronos transports are leaking,