Limit flood publishing

This commit is contained in:
Tanguy 2023-06-09 10:26:39 +02:00
parent c76d1e18ef
commit f2209dc299
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
2 changed files with 57 additions and 8 deletions

View File

@ -488,20 +488,28 @@ method publish*(g: GossipSub,
var peers: HashSet[PubSubPeer]
if g.parameters.floodPublish:
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic.
for peer in g.gossipsub.getOrDefault(topic):
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))
if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))
if g.parameters.floodPublish:
let
msgSize = data.len
bandwidth = 25_000_000 #TODO replace with bandwidth estimate
msToTransmit = max(msgSize div (bandwidth div 1000), 1)
maxFloodPublish =
(g.parameters.heartbeatInterval.milliseconds div msToTransmit)
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxFloodPublish: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
# not subscribed or bad mesh, send to fanout peers
# disable for floodPublish, since we already sent to every good peer

View File

@ -637,6 +637,47 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub floodPublish limit":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
let
nodes = generateNodes(
20,
gossip = true)
await allFuturesThrowing(
nodes.mapIt(it.switch.start())
)
var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
for node in nodes[1..^1]:
node.subscribe("foobar", handler)
await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs)
block setup:
for _ in 0..10:
if (await nodes[0].publish("foobar", "Hello!".toBytes())) == 19:
break setup
await sleepAsync(1.milliseconds)
check false
check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17
# Now try with a mesh
gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5
check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17
await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)
asyncTest "e2e - GossipSub with multiple peers":
var runs = 10