diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f92949dc0..575dd31e0 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -471,21 +471,43 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) = # Send unsubscribe (in reverse order to sub/graft) procCall PubSub(g).onTopicSubscription(topic, subscribed) -method publish*(g: GossipSub, - topic: string, - data: seq[byte]): Future[int] {.async.} = - # base returns always 0 - discard await procCall PubSub(g).publish(topic, data) +proc calculateMaxNumPeersFloodPublish*(g: GossipSub, data: seq[byte]): int64 = + let + msgSize = data.len + bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps or 12_500 bytes/ms TODO replace with bandwidth estimate + msToTransmit = max(msgSize div bandwidth, 1) + return max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow) - logScope: - topic +proc addPeersForFloodPublish(g: GossipSub, topic: string, peers: var HashSet[PubSubPeer], maxNumPeersFloodPublish: int64) = + # With flood publishing enabled, the mesh is used when propagating messages from other peers, + # but a peer's own messages will always be published to all known peers in the topic, limited + # to the amount of peers we can send it to in one heartbeat + for peer in g.gossipsub.getOrDefault(topic): + if peers.len >= maxNumPeersFloodPublish: break + if peer.score >= g.parameters.publishThreshold: + trace "publish: including flood/high score peer", peer + peers.incl(peer) - trace "Publishing message on topic", data = data.shortLog +proc addFanoutPeers(g: GossipSub, topic: string, peers: var HashSet[PubSubPeer]) = + var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + if fanoutPeers.len < g.parameters.dLow: + g.replenishFanout(topic) + fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - if topic.len <= 0: # data could be 0/empty - debug "Empty topic, skipping publish" - return 0 + g.rng.shuffle(fanoutPeers) + for fanPeer in fanoutPeers: + peers.incl(fanPeer) + if peers.len > g.parameters.d: break + + # even if we couldn't publish, + # we still attempted to publish + # on the topic, so it makes sense + # to update the last topic publish + # time + g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL) + +proc getDirectAndMeshPeers(g: GossipSub, topic: string): HashSet[PubSubPeer] = var peers: HashSet[PubSubPeer] # add always direct peers @@ -494,52 +516,9 @@ method publish*(g: GossipSub, if topic in g.topics: # if we're subscribed use the mesh peers.incl(g.mesh.getOrDefault(topic)) - if g.parameters.floodPublish: - let - msgSize = data.len - bandwidth = 25_000_000 #TODO replace with bandwidth estimate - msToTransmit = max(msgSize div (bandwidth div 1000), 1) - maxFloodPublish = - max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow) - # With flood publishing enabled, the mesh is used when propagating messages from other peers, - # but a peer's own messages will always be published to all known peers in the topic, limited - # to the amount of peers we can send it to in one heartbeat - for peer in g.gossipsub.getOrDefault(topic): - if peers.len >= maxFloodPublish: break - if peer.score >= g.parameters.publishThreshold: - trace "publish: including flood/high score peer", peer - peers.incl(peer) - - if peers.len < g.parameters.dLow: - # not subscribed, or bad mesh, send to fanout peers - var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - if fanoutPeers.len < g.parameters.dLow: - g.replenishFanout(topic) - fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - - g.rng.shuffle(fanoutPeers) - fanoutPeers.capLen(g.parameters.d - peers.len) - - for fanPeer in fanoutPeers: - peers.incl(fanPeer) - if peers.len > g.parameters.d: break - - # even if we couldn't publish, - # we still attempted to publish - # on the topic, so it makes sense - # to update the last topic publish - # time - g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL) - - if peers.len == 0: - let topicPeers = g.gossipsub.getOrDefault(topic).toSeq() - debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len, - connectedPeers = topicPeers.filterIt(it.connected).len, - topic - # skipping topic as our metrics finds that heavy - libp2p_gossipsub_failed_publish.inc() - return 0 + return peers +proc publishMessage(g: GossipSub, topic: string, data: seq[byte], peers: HashSet[PubSubPeer]): int {.raises: [LPError].} = let msg = if g.anonymize: @@ -572,9 +551,44 @@ method publish*(g: GossipSub, libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) trace "Published message to peers", peers=peers.len - return peers.len +method publish*(g: GossipSub, + topic: string, + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(g).publish(topic, data) + + logScope: + topic + + trace "Publishing message on topic", data = data.shortLog + + if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish" + return 0 + + var peers = g.getDirectAndMeshPeers(topic) + + if g.parameters.floodPublish: + let maxNumPeersFloodPublish = calculateMaxNumPeersFloodPublish(g, data) + addPeersForFloodPublish(g, topic, peers, maxNumPeersFloodPublish) + + if peers.len < g.parameters.dLow: + # not subscribed, or bad mesh, send to fanout peers + addFanoutPeers(g, topic, peers) + + if peers.len == 0: + let topicPeers = g.gossipsub.getOrDefault(topic).toSeq() + debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len, + connectedPeers = topicPeers.filterIt(it.connected).len, + topic + # skipping topic as our metrics finds that heavy + libp2p_gossipsub_failed_publish.inc() + return 0 + + return g.publishMessage(topic, data, peers) + proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = let peer = g.peers.getOrDefault(id) if isNil(peer): diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 2bdbdad25..c1717b791 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -665,14 +665,16 @@ suite "GossipSub": await sleepAsync(10.milliseconds) check false - check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17 + check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow + + check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17 # Now try with a mesh gossip1.subscribe("foobar", handler) checkExpiring: gossip1.mesh.peers("foobar") > 5 # use a different length so that the message is not equal to the last - check (await nodes[0].publish("foobar", newSeq[byte](1_000_001))) == 17 + check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17 await allFuturesThrowing( nodes.mapIt(it.switch.stop())