From 7a369dd1bfbc09cbdb158e70f1eef8c6d1296c9b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 31 Jul 2023 11:13:51 +0200 Subject: [PATCH] GossipSub: Limit flood publishing (#911) Co-authored-by: Diego --- libp2p/protocols/pubsub/gossipsub.nim | 36 ++++++++------- .../protocols/pubsub/gossipsub/behavior.nim | 2 +- libp2p/utility.nim | 4 ++ tests/pubsub/testgossipsub.nim | 45 ++++++++++++++++++- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 12bfb02b3..8bb88bfa3 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -511,32 +511,36 @@ method publish*(g: GossipSub, var peers: HashSet[PubSubPeer] - if g.parameters.floodPublish: - # With flood publishing enabled, the mesh is used when propagating messages from other peers, - # but a peer's own messages will always be published to all known peers in the topic. - for peer in g.gossipsub.getOrDefault(topic): - if peer.score >= g.parameters.publishThreshold: - trace "publish: including flood/high score peer", peer - peers.incl(peer) - # add always direct peers peers.incl(g.explicit.getOrDefault(topic)) if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) - if peers.len < g.parameters.dLow and g.parameters.floodPublish == false: - # not subscribed or bad mesh, send to fanout peers - # disable for floodPublish, since we already sent to every good peer - # + if g.parameters.floodPublish: + # With flood publishing enabled, the mesh is used when propagating messages from other peers, + # but a peer's own messages will always be published to all known peers in the topic, limited + # to the amount of peers we can send it to in one heartbeat + let + bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps TODO replace with bandwidth estimate + msToTransmit = max(data.len div bandwidth, 1) + maxPeersToFlod = + max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow) + + for peer in g.gossipsub.getOrDefault(topic): + if peers.len >= maxPeersToFlod: break + if peer.score >= g.parameters.publishThreshold: + trace "publish: including flood/high score peer", peer + peers.incl(peer) + + if peers.len < g.parameters.dLow: + # not subscribed, or bad mesh, send to fanout peers var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - if fanoutPeers.len == 0: + if fanoutPeers.len < g.parameters.dLow: g.replenishFanout(topic) fanoutPeers = g.fanout.getOrDefault(topic).toSeq() g.rng.shuffle(fanoutPeers) - if fanoutPeers.len + peers.len > g.parameters.d: - fanoutPeers.setLen(g.parameters.d - peers.len) for fanPeer in fanoutPeers: peers.incl(fanPeer) @@ -554,7 +558,6 @@ method publish*(g: GossipSub, debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len, connectedPeers = topicPeers.filterIt(it.connected).len, topic - # skipping topic as our metrics finds that heavy libp2p_gossipsub_failed_publish.inc() return 0 @@ -590,7 +593,6 @@ method publish*(g: GossipSub, libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) trace "Published message to peers", peers=peers.len - return peers.len proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index e4b193549..326c48763 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -556,8 +556,8 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} = logScope: topic trace "about to replenish fanout" - let currentMesh = g.mesh.getOrDefault(topic) if g.fanout.peers(topic) < g.parameters.dLow: + let currentMesh = g.mesh.getOrDefault(topic) trace "replenishing fanout", peers = g.fanout.peers(topic) for peer in g.gossipsub.getOrDefault(topic): if peer in currentMesh: continue diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 85995bee4..3b0a71345 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -70,6 +70,10 @@ template safeConvert*[T: SomeInteger, S: Ordinal](value: S): T = else: {.error: "Source and target types have an incompatible range low..high".} +proc capLen*[T](s: var seq[T], length: Natural) = + if s.len > length: + s.setLen(length) + template exceptionToAssert*(body: untyped): untyped = block: var res: type(body) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index f3d698cbc..81b46f4d7 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -628,7 +628,6 @@ suite "GossipSub": "foobar" in gossip1.gossipsub "foobar" notin gossip2.gossipsub not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) - not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) await allFuturesThrowing( nodes[0].switch.stop(), @@ -637,6 +636,50 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub floodPublish limit": + var passed: Future[bool] = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + + let + nodes = generateNodes( + 20, + gossip = true) + + await allFuturesThrowing( + nodes.mapIt(it.switch.start()) + ) + + var gossip1: GossipSub = GossipSub(nodes[0]) + gossip1.parameters.floodPublish = true + gossip1.parameters.heartbeatInterval = milliseconds(700) + + for node in nodes[1..^1]: + node.subscribe("foobar", handler) + await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs) + + block setup: + for i in 0..<50: + if (await nodes[0].publish("foobar", ("Hello!" & $i).toBytes())) == 19: + break setup + await sleepAsync(10.milliseconds) + check false + + check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow + + check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == 17 + + # Now try with a mesh + gossip1.subscribe("foobar", handler) + checkExpiring: gossip1.mesh.peers("foobar") > 5 + + # use a different length so that the message is not equal to the last + check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17 + + await allFuturesThrowing( + nodes.mapIt(it.switch.stop()) + ) + asyncTest "e2e - GossipSub with multiple peers": var runs = 10