Merge branch 'master' into gossip-one-one

This commit is contained in:
Giovanni Petrantoni 2020-07-21 11:02:02 +09:00
commit 0776cc77e7
4 changed files with 47 additions and 14 deletions

View File

@ -152,6 +152,12 @@ method unsubscribe*(f: FloodSub,
for p in f.peers.values: for p in f.peers.values:
await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
await procCall PubSub(f).unsubscribeAll(topic)
for p in f.peers.values:
await f.sendSubs(p, @[topic], false)
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.peers = initTable[string, PubSubPeer]() f.peers = initTable[string, PubSubPeer]()

View File

@ -817,15 +817,31 @@ method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(g).unsubscribe(topics) await procCall PubSub(g).unsubscribe(topics)
for pair in topics: for (topic, handler) in topics:
let topic = pair.topic # delete from mesh only if no handlers are left
if topic in g.mesh: if g.topics[topic].handler.len <= 0:
let peers = g.mesh.getOrDefault(topic) if topic in g.mesh:
g.mesh.del(topic) let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic)
for peer in peers: var pending = newSeq[Future[void]]()
g.pruned(peer, topic) for peer in peers:
await peer.sendPrune(@[topic]) g.pruned(peer, topic)
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
await procCall PubSub(g).unsubscribeAll(topic)
if topic in g.mesh:
let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic)
var pending = newSeq[Future[void]]()
for peer in peers:
g.pruned(peer, topic)
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,

View File

@ -249,16 +249,16 @@ method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings ## unsubscribe from a list of ``topic`` strings
for t in topics: for t in topics:
# metrics
libp2p_pubsub_topics.dec()
for i, h in p.topics[t.topic].handler: for i, h in p.topics[t.topic].handler:
if h == t.handler: if h == t.handler:
p.topics[t.topic].handler.del(i) p.topics[t.topic].handler.del(i)
# make sure we delete the topic if # make sure we delete the topic if
# no more handlers are left # no more handlers are left
if p.topics[t.topic].handler.len <= 0: if p.topics[t.topic].handler.len <= 0:
p.topics.del(t.topic) p.topics.del(t.topic)
# metrics
libp2p_pubsub_topics.dec()
proc unsubscribe*(p: PubSub, proc unsubscribe*(p: PubSub,
topic: string, topic: string,
@ -266,6 +266,10 @@ proc unsubscribe*(p: PubSub,
## unsubscribe from a ``topic`` string ## unsubscribe from a ``topic`` string
p.unsubscribe(@[(topic, handler)]) p.unsubscribe(@[(topic, handler)])
method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} =
libp2p_pubsub_topics.dec()
p.topics.del(topic)
method subscribe*(p: PubSub, method subscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler) {.base, async.} = handler: TopicHandler) {.base, async.} =

View File

@ -482,6 +482,13 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]) {.async.} =
await s.pubSub.get().unsubscribe(topics) await s.pubSub.get().unsubscribe(topics)
proc unsubscribeAll*(s: Switch, topic: string) {.async.} =
## unsubscribe from topics
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().unsubscribeAll(topic)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} = proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} =
# pubslish to pubsub topic # pubslish to pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone: