diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9a2a574fa..010be4f53 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -482,13 +482,23 @@ method publish*(g: GossipSub, if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) - else: # not subscribed, send to fanout peers - # try optimistically - peers.incl(g.fanout.getOrDefault(topic)) - if peers.len == 0: - # ok we had nothing.. let's try replenish inline + + if peers.len < g.parameters.dLow and g.parameters.floodPublish == false: + # not subscribed or bad mesh, send to fanout peers + # disable for floodPublish, since we already sent to every good peer + # + var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + if fanoutPeers.len == 0: g.replenishFanout(topic) - peers.incl(g.fanout.getOrDefault(topic)) + fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + + fanoutPeers.shuffle() + if fanoutPeers.len + peers.len > g.parameters.d: + fanoutPeers.setLen(g.parameters.d - peers.len) + + for fanPeer in fanoutPeers: + peers.incl(fanPeer) + if peers.len > g.parameters.d: break # even if we couldn't publish, # we still attempted to publish diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 9bb2a946f..e54584627 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -489,9 +489,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} = logScope: topic trace "about to replenish fanout" + let currentMesh = g.mesh.getOrDefault(topic) if g.fanout.peers(topic) < g.parameters.dLow: trace "replenishing fanout", peers = g.fanout.peers(topic) for peer in g.gossipsub.getOrDefault(topic): + if peer in currentMesh: continue if g.fanout.addPeer(topic, peer): if g.fanout.peers(topic) == g.parameters.d: break diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 11eeda65c..0a67114f6 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -565,6 +565,71 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) check observed == 2 + asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": + var passed = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete() + + let + nodes = generateNodes( + 2, + gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + nodes[1].subscribe("foobar", handler) + nodes[0].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + await waitSub(nodes[1], nodes[0], "foobar") + + nodes[0].unsubscribe("foobar", handler) + + let gsNode = GossipSub(nodes[1]) + check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0) + + nodes[0].subscribe("foobar", handler) + + check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check: + GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0 + GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + await passed.wait(2.seconds) + + trace "test done, stopping..." + + await nodes[0].stop() + await nodes[1].stop() + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub send over mesh A -> B": var passed: Future[bool] = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =