mirror of https://github.com/vacp2p/nim-libp2p.git
GossipSub: Limit flood publishing (#911)
Co-authored-by: Diego <diego@status.im>
This commit is contained in:
parent
b784167805
commit
7a369dd1bf
|
@ -511,32 +511,36 @@ method publish*(g: GossipSub,
|
||||||
|
|
||||||
var peers: HashSet[PubSubPeer]
|
var peers: HashSet[PubSubPeer]
|
||||||
|
|
||||||
if g.parameters.floodPublish:
|
|
||||||
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
|
|
||||||
# but a peer's own messages will always be published to all known peers in the topic.
|
|
||||||
for peer in g.gossipsub.getOrDefault(topic):
|
|
||||||
if peer.score >= g.parameters.publishThreshold:
|
|
||||||
trace "publish: including flood/high score peer", peer
|
|
||||||
peers.incl(peer)
|
|
||||||
|
|
||||||
# add always direct peers
|
# add always direct peers
|
||||||
peers.incl(g.explicit.getOrDefault(topic))
|
peers.incl(g.explicit.getOrDefault(topic))
|
||||||
|
|
||||||
if topic in g.topics: # if we're subscribed use the mesh
|
if topic in g.topics: # if we're subscribed use the mesh
|
||||||
peers.incl(g.mesh.getOrDefault(topic))
|
peers.incl(g.mesh.getOrDefault(topic))
|
||||||
|
|
||||||
if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
|
if g.parameters.floodPublish:
|
||||||
# not subscribed or bad mesh, send to fanout peers
|
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
|
||||||
# disable for floodPublish, since we already sent to every good peer
|
# but a peer's own messages will always be published to all known peers in the topic, limited
|
||||||
#
|
# to the amount of peers we can send it to in one heartbeat
|
||||||
|
let
|
||||||
|
bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps TODO replace with bandwidth estimate
|
||||||
|
msToTransmit = max(data.len div bandwidth, 1)
|
||||||
|
maxPeersToFlod =
|
||||||
|
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
|
||||||
|
|
||||||
|
for peer in g.gossipsub.getOrDefault(topic):
|
||||||
|
if peers.len >= maxPeersToFlod: break
|
||||||
|
if peer.score >= g.parameters.publishThreshold:
|
||||||
|
trace "publish: including flood/high score peer", peer
|
||||||
|
peers.incl(peer)
|
||||||
|
|
||||||
|
if peers.len < g.parameters.dLow:
|
||||||
|
# not subscribed, or bad mesh, send to fanout peers
|
||||||
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||||
if fanoutPeers.len == 0:
|
if fanoutPeers.len < g.parameters.dLow:
|
||||||
g.replenishFanout(topic)
|
g.replenishFanout(topic)
|
||||||
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||||
|
|
||||||
g.rng.shuffle(fanoutPeers)
|
g.rng.shuffle(fanoutPeers)
|
||||||
if fanoutPeers.len + peers.len > g.parameters.d:
|
|
||||||
fanoutPeers.setLen(g.parameters.d - peers.len)
|
|
||||||
|
|
||||||
for fanPeer in fanoutPeers:
|
for fanPeer in fanoutPeers:
|
||||||
peers.incl(fanPeer)
|
peers.incl(fanPeer)
|
||||||
|
@ -554,7 +558,6 @@ method publish*(g: GossipSub,
|
||||||
debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len,
|
debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len,
|
||||||
connectedPeers = topicPeers.filterIt(it.connected).len,
|
connectedPeers = topicPeers.filterIt(it.connected).len,
|
||||||
topic
|
topic
|
||||||
# skipping topic as our metrics finds that heavy
|
|
||||||
libp2p_gossipsub_failed_publish.inc()
|
libp2p_gossipsub_failed_publish.inc()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
@ -590,7 +593,6 @@ method publish*(g: GossipSub,
|
||||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
|
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
|
||||||
|
|
||||||
trace "Published message to peers", peers=peers.len
|
trace "Published message to peers", peers=peers.len
|
||||||
|
|
||||||
return peers.len
|
return peers.len
|
||||||
|
|
||||||
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
||||||
|
|
|
@ -556,8 +556,8 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
|
||||||
logScope: topic
|
logScope: topic
|
||||||
trace "about to replenish fanout"
|
trace "about to replenish fanout"
|
||||||
|
|
||||||
let currentMesh = g.mesh.getOrDefault(topic)
|
|
||||||
if g.fanout.peers(topic) < g.parameters.dLow:
|
if g.fanout.peers(topic) < g.parameters.dLow:
|
||||||
|
let currentMesh = g.mesh.getOrDefault(topic)
|
||||||
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
||||||
for peer in g.gossipsub.getOrDefault(topic):
|
for peer in g.gossipsub.getOrDefault(topic):
|
||||||
if peer in currentMesh: continue
|
if peer in currentMesh: continue
|
||||||
|
|
|
@ -70,6 +70,10 @@ template safeConvert*[T: SomeInteger, S: Ordinal](value: S): T =
|
||||||
else:
|
else:
|
||||||
{.error: "Source and target types have an incompatible range low..high".}
|
{.error: "Source and target types have an incompatible range low..high".}
|
||||||
|
|
||||||
|
proc capLen*[T](s: var seq[T], length: Natural) =
|
||||||
|
if s.len > length:
|
||||||
|
s.setLen(length)
|
||||||
|
|
||||||
template exceptionToAssert*(body: untyped): untyped =
|
template exceptionToAssert*(body: untyped): untyped =
|
||||||
block:
|
block:
|
||||||
var res: type(body)
|
var res: type(body)
|
||||||
|
|
|
@ -628,7 +628,6 @@ suite "GossipSub":
|
||||||
"foobar" in gossip1.gossipsub
|
"foobar" in gossip1.gossipsub
|
||||||
"foobar" notin gossip2.gossipsub
|
"foobar" notin gossip2.gossipsub
|
||||||
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||||
not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
nodes[0].switch.stop(),
|
nodes[0].switch.stop(),
|
||||||
|
@ -637,6 +636,50 @@ suite "GossipSub":
|
||||||
|
|
||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
|
asyncTest "e2e - GossipSub floodPublish limit":
|
||||||
|
var passed: Future[bool] = newFuture[bool]()
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
|
||||||
|
let
|
||||||
|
nodes = generateNodes(
|
||||||
|
20,
|
||||||
|
gossip = true)
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes.mapIt(it.switch.start())
|
||||||
|
)
|
||||||
|
|
||||||
|
var gossip1: GossipSub = GossipSub(nodes[0])
|
||||||
|
gossip1.parameters.floodPublish = true
|
||||||
|
gossip1.parameters.heartbeatInterval = milliseconds(700)
|
||||||
|
|
||||||
|
for node in nodes[1..^1]:
|
||||||
|
node.subscribe("foobar", handler)
|
||||||
|
await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs)
|
||||||
|
|
||||||
|
block setup:
|
||||||
|
for i in 0..<50:
|
||||||
|
if (await nodes[0].publish("foobar", ("Hello!" & $i).toBytes())) == 19:
|
||||||
|
break setup
|
||||||
|
await sleepAsync(10.milliseconds)
|
||||||
|
check false
|
||||||
|
|
||||||
|
check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow
|
||||||
|
|
||||||
|
check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == 17
|
||||||
|
|
||||||
|
# Now try with a mesh
|
||||||
|
gossip1.subscribe("foobar", handler)
|
||||||
|
checkExpiring: gossip1.mesh.peers("foobar") > 5
|
||||||
|
|
||||||
|
# use a different length so that the message is not equal to the last
|
||||||
|
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes.mapIt(it.switch.stop())
|
||||||
|
)
|
||||||
|
|
||||||
asyncTest "e2e - GossipSub with multiple peers":
|
asyncTest "e2e - GossipSub with multiple peers":
|
||||||
var runs = 10
|
var runs = 10
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue