From f2209dc29936ac0ed7f37f375a9bb069c54ea47b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 9 Jun 2023 10:26:39 +0200 Subject: [PATCH] Limit flood publishing --- libp2p/protocols/pubsub/gossipsub.nim | 24 ++++++++++------ tests/pubsub/testgossipsub.nim | 41 +++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 75fc48d9a..ac945b60c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -488,20 +488,28 @@ method publish*(g: GossipSub, var peers: HashSet[PubSubPeer] - if g.parameters.floodPublish: - # With flood publishing enabled, the mesh is used when propagating messages from other peers, - # but a peer's own messages will always be published to all known peers in the topic. - for peer in g.gossipsub.getOrDefault(topic): - if peer.score >= g.parameters.publishThreshold: - trace "publish: including flood/high score peer", peer - peers.incl(peer) - # add always direct peers peers.incl(g.explicit.getOrDefault(topic)) if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) + if g.parameters.floodPublish: + let + msgSize = data.len + bandwidth = 25_000_000 #TODO replace with bandwidth estimate + msToTransmit = max(msgSize div (bandwidth div 1000), 1) + maxFloodPublish = + (g.parameters.heartbeatInterval.milliseconds div msToTransmit) + # With flood publishing enabled, the mesh is used when propagating messages from other peers, + # but a peer's own messages will always be published to all known peers in the topic, limited + # to the amount of peers we can send it to in one heartbeat + for peer in g.gossipsub.getOrDefault(topic): + if peers.len >= maxFloodPublish: break + if peer.score >= g.parameters.publishThreshold: + trace "publish: including flood/high score peer", peer + peers.incl(peer) + if peers.len < g.parameters.dLow and g.parameters.floodPublish == false: # not subscribed or bad mesh, send to fanout peers # disable for floodPublish, since we already sent to every good peer diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 396bccbab..96767fd3c 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -637,6 +637,47 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub floodPublish limit": + var passed: Future[bool] = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + + let + nodes = generateNodes( + 20, + gossip = true) + + await allFuturesThrowing( + nodes.mapIt(it.switch.start()) + ) + + var gossip1: GossipSub = GossipSub(nodes[0]) + gossip1.parameters.floodPublish = true + gossip1.parameters.heartbeatInterval = milliseconds(700) + + for node in nodes[1..^1]: + node.subscribe("foobar", handler) + await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs) + + block setup: + for _ in 0..10: + if (await nodes[0].publish("foobar", "Hello!".toBytes())) == 19: + break setup + await sleepAsync(1.milliseconds) + check false + + check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17 + + # Now try with a mesh + gossip1.subscribe("foobar", handler) + checkExpiring: gossip1.mesh.peers("foobar") > 5 + + check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17 + + await allFuturesThrowing( + nodes.mapIt(it.switch.stop()) + ) + asyncTest "e2e - GossipSub with multiple peers": var runs = 10