diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 2be053c49..6df139e0c 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -152,6 +152,12 @@ method unsubscribe*(f: FloodSub, for p in f.peers.values: await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) +method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = + await procCall PubSub(f).unsubscribeAll(topic) + + for p in f.peers.values: + await f.sendSubs(p, @[topic], false) + method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.peers = initTable[string, PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0c848771e..e3d6ed25c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -450,14 +450,29 @@ method unsubscribe*(g: GossipSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(g).unsubscribe(topics) - for pair in topics: - let topic = pair.topic - if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) - g.mesh.del(topic) + for (topic, handler) in topics: + # delete from mesh only if no handlers are left + if g.topics[topic].handler.len <= 0: + if topic in g.mesh: + let peers = g.mesh.getOrDefault(topic) + g.mesh.del(topic) - for peer in peers: - await peer.sendPrune(@[topic]) + var pending = newSeq[Future[void]]() + for peer in peers: + pending.add(peer.sendPrune(@[topic])) + checkFutures(await allFinished(pending)) + +method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = + await procCall PubSub(g).unsubscribeAll(topic) + + if topic in g.mesh: + let peers = g.mesh.getOrDefault(topic) + g.mesh.del(topic) + + var pending = newSeq[Future[void]]() + for peer in peers: + pending.add(peer.sendPrune(@[topic])) + checkFutures(await allFinished(pending)) method publish*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 3cf027126..c4d17fe49 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -233,16 +233,16 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - # metrics - libp2p_pubsub_topics.dec() for i, h in p.topics[t.topic].handler: if h == t.handler: p.topics[t.topic].handler.del(i) - # make sure we delete the topic if - # no more handlers are left - if p.topics[t.topic].handler.len <= 0: - p.topics.del(t.topic) + # make sure we delete the topic if + # no more handlers are left + if p.topics[t.topic].handler.len <= 0: + p.topics.del(t.topic) + # metrics + libp2p_pubsub_topics.dec() proc unsubscribe*(p: PubSub, topic: string, @@ -250,6 +250,10 @@ proc unsubscribe*(p: PubSub, ## unsubscribe from a ``topic`` string p.unsubscribe(@[(topic, handler)]) +method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} = + libp2p_pubsub_topics.dec() + p.topics.del(topic) + method subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.base, async.} = diff --git a/libp2p/switch.nim b/libp2p/switch.nim index ea40b8410..dc62f6a60 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -449,6 +449,13 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]) {.async.} = await s.pubSub.get().unsubscribe(topics) +proc unsubscribeAll*(s: Switch, topic: string) {.async.} = + ## unsubscribe from topics + if s.pubSub.isNone: + raise newNoPubSubException() + + await s.pubSub.get().unsubscribeAll(topic) + proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} = # pubslish to pubsub topic if s.pubSub.isNone: