From 3b088f898045ceb72387effa1ef81938959aa725 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Tue, 21 Jul 2020 01:16:13 +0900 Subject: [PATCH] Fix some unsubscribe issues and add unsubscribeAll helper (#282) * Fix some unsub issues and add unsuball helper * batch sendprune in unsubscribe methods * add unsubscribeAll for floodsub --- libp2p/protocols/pubsub/floodsub.nim | 6 ++++++ libp2p/protocols/pubsub/gossipsub.nim | 29 ++++++++++++++++++++------- libp2p/protocols/pubsub/pubsub.nim | 16 +++++++++------ libp2p/switch.nim | 7 +++++++ 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 2be053c..6df139e 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -152,6 +152,12 @@ method unsubscribe*(f: FloodSub, for p in f.peers.values: await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) +method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = + await procCall PubSub(f).unsubscribeAll(topic) + + for p in f.peers.values: + await f.sendSubs(p, @[topic], false) + method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.peers = initTable[string, PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0c84877..e3d6ed2 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -450,14 +450,29 @@ method unsubscribe*(g: GossipSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(g).unsubscribe(topics) - for pair in topics: - let topic = pair.topic - if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) - g.mesh.del(topic) + for (topic, handler) in topics: + # delete from mesh only if no handlers are left + if g.topics[topic].handler.len <= 0: + if topic in g.mesh: + let peers = g.mesh.getOrDefault(topic) + g.mesh.del(topic) - for peer in peers: - await peer.sendPrune(@[topic]) + var pending = newSeq[Future[void]]() + for peer in peers: + pending.add(peer.sendPrune(@[topic])) + checkFutures(await allFinished(pending)) + +method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = + await procCall PubSub(g).unsubscribeAll(topic) + + if topic in g.mesh: + let peers = g.mesh.getOrDefault(topic) + g.mesh.del(topic) + + var pending = newSeq[Future[void]]() + for peer in peers: + pending.add(peer.sendPrune(@[topic])) + checkFutures(await allFinished(pending)) method publish*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 3cf0271..c4d17fe 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -233,16 +233,16 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - # metrics - libp2p_pubsub_topics.dec() for i, h in p.topics[t.topic].handler: if h == t.handler: p.topics[t.topic].handler.del(i) - # make sure we delete the topic if - # no more handlers are left - if p.topics[t.topic].handler.len <= 0: - p.topics.del(t.topic) + # make sure we delete the topic if + # no more handlers are left + if p.topics[t.topic].handler.len <= 0: + p.topics.del(t.topic) + # metrics + libp2p_pubsub_topics.dec() proc unsubscribe*(p: PubSub, topic: string, @@ -250,6 +250,10 @@ proc unsubscribe*(p: PubSub, ## unsubscribe from a ``topic`` string p.unsubscribe(@[(topic, handler)]) +method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} = + libp2p_pubsub_topics.dec() + p.topics.del(topic) + method subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.base, async.} = diff --git a/libp2p/switch.nim b/libp2p/switch.nim index ea40b84..dc62f6a 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -449,6 +449,13 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]) {.async.} = await s.pubSub.get().unsubscribe(topics) +proc unsubscribeAll*(s: Switch, topic: string) {.async.} = + ## unsubscribe from topics + if s.pubSub.isNone: + raise newNoPubSubException() + + await s.pubSub.get().unsubscribeAll(topic) + proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} = # pubslish to pubsub topic if s.pubSub.isNone: