diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 7f68e6d13..a78d6f96a 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -117,8 +117,9 @@ method subscribeToPeer*(p: FloodSub, method publish*(f: FloodSub, topic: string, - data: seq[byte]) {.async.} = - await procCall PubSub(f).publish(topic, data) + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(f).publish(topic, data) if data.len <= 0 or topic.len <= 0: trace "topic or data missing, skipping publish" @@ -143,6 +144,8 @@ method publish*(f: FloodSub, libp2p_pubsub_messages_published.inc(labelValues = [topic]) + return sent.filterIt(not it.failed).len + method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 3d3fc366a..8697de340 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -56,6 +56,7 @@ type heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats + subLock: AsyncLock declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) @@ -75,8 +76,8 @@ method init*(g: GossipSub) = proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic - debug "about to replenish fanout", topic - + debug "about to replenish fanout", topic, avail = g.gossipsub[topic].len + var topicHash = g.fanout.mgetOrPut(topic, initHashSet[string]()) if topicHash.len < GossipSubDLo: @@ -87,14 +88,16 @@ proc replenishFanout(g: GossipSub, topic: string) = # set the fanout expiry time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) if topicHash.len == GossipSubD: - break + break libp2p_gossipsub_peers_per_topic_fanout.set(topicHash.len.int64, labelValues = [topic]) - debug "fanout replenished with peers", peers = topicHash.len + debug "fanout replenished with peers", peers = g.fanout[topic].len proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: trace "about to rebalance mesh" + + await g.subLock.acquire() var topicHash = g.mesh.mgetOrPut(topic, initHashSet[string]()) @@ -106,10 +109,9 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # replenish the mesh if we're below GossipSubDlo while topicHash.len < GossipSubD: trace "gathering peers", peers = topicHash.len - await sleepAsync(1.millis) # don't starve the event loop var id: string if fanOutHash.len > 0: - trace "getting peer from fanout", topic, + debug "getting peer from fanout", topic, peers = fanOutHash.len id = sample(toSeq(fanOutHash)) @@ -120,7 +122,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "got fanout peer", peer = id elif gossipHash.len > 0: - trace "getting peer from gossipsub", topic, + debug "getting peer from gossipsub", topic, peers = gossipHash.len id = sample(toSeq(gossipHash)) @@ -161,10 +163,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = libp2p_gossipsub_peers_per_topic_mesh .set(topicHash.len.int64, labelValues = [topic]) - trace "mesh balanced, got peers", peers = topicHash.len, + debug "mesh balanced, got peers", peers = g.mesh[topic].len, topicId = topic except CatchableError as exc: trace "exception occurred re-balancing mesh", exc = exc.msg + finally: + g.subLock.release() proc dropFanoutPeers(g: GossipSub) {.async.} = # drop peers that we haven't published to in @@ -174,6 +178,7 @@ proc dropFanoutPeers(g: GossipSub) {.async.} = if Moment.now > val: dropping.add(topic) g.fanout.del(topic) + debug "dropping fanout topic", topic for topic in dropping: g.lastFanoutPubSub.del(topic) @@ -221,6 +226,7 @@ proc heartbeat(g: GossipSub) {.async.} = for t in toSeq(g.topics.keys): await g.rebalanceMesh(t) + g.replenishFanout(t) await g.dropFanoutPeers() let peers = g.getGossipPeers() @@ -281,20 +287,26 @@ method subscribeTopic*(g: GossipSub, peerId: string) {.gcsafe, async.} = await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() + try: + await g.subLock.acquire() - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe remote peer to the topic - g.gossipsub[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe remote peer from the topic - g.gossipsub[topic].excl(peerId) + var gossipHash = g.gossipsub.mgetOrPut(topic, initHashSet[string]()) + + if subscribe: + debug "adding subscription for topic", peer = peerId, name = topic + # subscribe remote peer to the topic + gossipHash.incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe remote peer from the topic + gossipHash.excl(peerId) + finally: + g.subLock.release() libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + + debug "gossip peers", peers = g.gossipsub[topic].len, topic if topic in g.topics: await g.rebalanceMesh(topic) @@ -385,7 +397,6 @@ method rpcHandler*(g: GossipSub, continue for t in msg.topicIDs: # for every topic in the message - await g.rebalanceMesh(t) # gather peers for each topic if t in g.floodsub: toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic @@ -458,48 +469,48 @@ method unsubscribe*(g: GossipSub, method publish*(g: GossipSub, topic: string, - data: seq[byte]) {.async.} = - await procCall PubSub(g).publish(topic, data) + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(g).publish(topic, data) trace "about to publish message on topic", name = topic, data = data.shortLog var peers: HashSet[string] - # TODO: we probably don't need to try multiple times - if data.len > 0 and topic.len > 0: - for _ in 0..<5: # try to get peers up to 5 times - if peers.len > 0: - break - if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh - await g.rebalanceMesh(topic) - peers = g.mesh.getOrDefault(topic) - else: # send to fanout peers - await g.replenishFanout(topic) - if topic in g.fanout: - peers = g.fanout.getOrDefault(topic) - # set the fanout expiry time - g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) - - # wait a second between tries - await sleepAsync(1.seconds) + if topic.len > 0: # data could be 0/empty + if topic in g.topics: # if we're subscribed use the mesh + peers = g.mesh.getOrDefault(topic) + else: # not subscribed, send to fanout peers + peers = g.fanout.getOrDefault(topic) let msg = newMessage(g.peerInfo, data, topic, g.sign) trace "created new message", msg + + trace "publishing on topic", name = topic, peers = peers + if msg.msgId notin g.mcache: + g.mcache.put(msg) + var sent: seq[Future[void]] for p in peers: + # avoid sending to self if p == g.peerInfo.id: continue - trace "publishing on topic", name = topic - if msg.msgId notin g.mcache: - g.mcache.put(msg) - - if p in g.peers: - sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) - checkFutures(await allFinished(sent)) + let peer = g.peers.getOrDefault(p) + if not isNil(peer.peerInfo): + trace "publish: sending message to peer", peer = p + sent.add(peer.send(@[RPCMsg(messages: @[msg])])) + + sent = await allFinished(sent) + checkFutures(sent) libp2p_pubsub_messages_published.inc(labelValues = [topic]) + return sent.filterIt(not it.failed).len + else: + return 0 + + method start*(g: GossipSub) {.async.} = debug "gossipsub start" @@ -543,3 +554,4 @@ method initPubSub*(g: GossipSub) = g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages g.heartbeatLock = newAsyncLock() + g.subLock = newAsyncLock() diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index c893892cf..d0814c17e 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -225,8 +225,7 @@ method subscribe*(p: PubSub, method publish*(p: PubSub, topic: string, - data: seq[byte]) {.base, async.} = - # TODO: Should throw indicating success/failure + data: seq[byte]): Future[int] {.base, async.} = ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: @@ -241,6 +240,8 @@ method publish*(p: PubSub, # more cleanup though debug "Could not write to pubsub connection", msg = exc.msg + return 0 + method initPubSub*(p: PubSub) {.base.} = ## perform pubsub initialization p.observers = new(seq[PubSubObserver]) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index bf0463156..9d9a01470 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -556,7 +556,7 @@ proc subscribe*(s: Switch, topic: string, retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().subscribe(topic, handler) + return s.pubSub.get().subscribe(topic, handler) proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = ## unsubscribe from topics @@ -565,16 +565,16 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().unsubscribe(topics) + return s.pubSub.get().unsubscribe(topics) -proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] = +proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] = # pubslish to pubsub topic if s.pubSub.isNone: - var retFuture = newFuture[void]("Switch.publish") + var retFuture = newFuture[int]("Switch.publish") retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().publish(topic, data) + return s.pubSub.get().publish(topic, data) proc addValidator*(s: Switch, topics: varargs[string], diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a72d67c3d..57c2069e3 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -43,6 +43,14 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = dec ceil doAssert(ceil > 0, "waitSub timeout!") +template tryPublish(call: untyped, require: int, times: int = 10, wait: Duration = 1.seconds): untyped = + var limit = times + while (call) < require and limit > 0: + await sleepAsync(wait) + limit.dec() + if limit == 0: + doAssert(false, "Failed to publish!") + suite "GossipSub": teardown: for tracker in testTrackers(): @@ -64,9 +72,7 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[0].subscribe("foobar", handler) - await waitSub(nodes[1], nodes[0], "foobar") await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -77,8 +83,8 @@ suite "GossipSub": result = true nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) - + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + result = (await validatorFut) and (await handlerFut) await allFuturesThrowing( nodes[0].stop(), @@ -101,17 +107,16 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): Future[bool] {.async.} = - validatorFut.complete(true) result = false + validatorFut.complete(true) nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = await validatorFut await allFuturesThrowing( @@ -135,10 +140,9 @@ suite "GossipSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) + await nodes[1].subscribe("foo", handler) - await waitSub(nodes[0], nodes[1], "foo") await nodes[1].subscribe("bar", handler) - await waitSub(nodes[0], nodes[1], "bar") var passed, failed: Future[bool] = newFuture[bool]() proc validator(topic: string, @@ -152,8 +156,8 @@ suite "GossipSub": false nodes[1].addValidator("foo", "bar", validator) - await nodes[0].publish("foo", "Hello!".toBytes()) - await nodes[0].publish("bar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 + tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 result = ((await passed) and (await failed) and (await handlerFut)) await allFuturesThrowing( @@ -179,7 +183,7 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(1.seconds) + await sleepAsync(10.seconds) let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get()) @@ -273,7 +277,7 @@ suite "GossipSub": nodes[1].pubsub.get().addObserver(obs1) nodes[0].pubsub.get().addObserver(obs2) - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) @@ -310,7 +314,7 @@ suite "GossipSub": await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = await passed @@ -353,10 +357,10 @@ suite "GossipSub": await allFuturesThrowing(subs) - await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), - 1.minutes) + tryPublish await wait(nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.id)), + 1.minutes), runs await wait(seenFut, 2.minutes) check: seen.len >= runs @@ -401,10 +405,10 @@ suite "GossipSub": subs &= dialer.subscribe("foobar", handler) await allFuturesThrowing(subs) - await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), - 1.minutes) + tryPublish await wait(nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.id)), + 1.minutes), runs await wait(seenFut, 5.minutes) check: seen.len >= runs