Fix bandwidth and split publish into smaller procs

This commit is contained in:
Diego 2023-06-29 15:58:09 +02:00
parent 652e7810b6
commit c10c232424
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
2 changed files with 75 additions and 59 deletions

View File

@ -471,21 +471,43 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
# Send unsubscribe (in reverse order to sub/graft) # Send unsubscribe (in reverse order to sub/graft)
procCall PubSub(g).onTopicSubscription(topic, subscribed) procCall PubSub(g).onTopicSubscription(topic, subscribed)
method publish*(g: GossipSub, proc calculateMaxNumPeersFloodPublish*(g: GossipSub, data: seq[byte]): int64 =
topic: string, let
data: seq[byte]): Future[int] {.async.} = msgSize = data.len
# base returns always 0 bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps or 12_500 bytes/ms TODO replace with bandwidth estimate
discard await procCall PubSub(g).publish(topic, data) msToTransmit = max(msgSize div bandwidth, 1)
return max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
logScope: proc addPeersForFloodPublish(g: GossipSub, topic: string, peers: var HashSet[PubSubPeer], maxNumPeersFloodPublish: int64) =
topic # With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxNumPeersFloodPublish: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
trace "Publishing message on topic", data = data.shortLog proc addFanoutPeers(g: GossipSub, topic: string, peers: var HashSet[PubSubPeer]) =
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if fanoutPeers.len < g.parameters.dLow:
g.replenishFanout(topic)
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if topic.len <= 0: # data could be 0/empty g.rng.shuffle(fanoutPeers)
debug "Empty topic, skipping publish"
return 0
for fanPeer in fanoutPeers:
peers.incl(fanPeer)
if peers.len > g.parameters.d: break
# even if we couldn't publish,
# we still attempted to publish
# on the topic, so it makes sense
# to update the last topic publish
# time
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
proc getDirectAndMeshPeers(g: GossipSub, topic: string): HashSet[PubSubPeer] =
var peers: HashSet[PubSubPeer] var peers: HashSet[PubSubPeer]
# add always direct peers # add always direct peers
@ -494,52 +516,9 @@ method publish*(g: GossipSub,
if topic in g.topics: # if we're subscribed use the mesh if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic)) peers.incl(g.mesh.getOrDefault(topic))
if g.parameters.floodPublish: return peers
let
msgSize = data.len
bandwidth = 25_000_000 #TODO replace with bandwidth estimate
msToTransmit = max(msgSize div (bandwidth div 1000), 1)
maxFloodPublish =
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxFloodPublish: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
if peers.len < g.parameters.dLow:
# not subscribed, or bad mesh, send to fanout peers
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if fanoutPeers.len < g.parameters.dLow:
g.replenishFanout(topic)
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
g.rng.shuffle(fanoutPeers)
fanoutPeers.capLen(g.parameters.d - peers.len)
for fanPeer in fanoutPeers:
peers.incl(fanPeer)
if peers.len > g.parameters.d: break
# even if we couldn't publish,
# we still attempted to publish
# on the topic, so it makes sense
# to update the last topic publish
# time
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
if peers.len == 0:
let topicPeers = g.gossipsub.getOrDefault(topic).toSeq()
debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len,
connectedPeers = topicPeers.filterIt(it.connected).len,
topic
# skipping topic as our metrics finds that heavy
libp2p_gossipsub_failed_publish.inc()
return 0
proc publishMessage(g: GossipSub, topic: string, data: seq[byte], peers: HashSet[PubSubPeer]): int {.raises: [LPError].} =
let let
msg = msg =
if g.anonymize: if g.anonymize:
@ -572,9 +551,44 @@ method publish*(g: GossipSub,
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
trace "Published message to peers", peers=peers.len trace "Published message to peers", peers=peers.len
return peers.len return peers.len
method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
logScope:
topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
var peers = g.getDirectAndMeshPeers(topic)
if g.parameters.floodPublish:
let maxNumPeersFloodPublish = calculateMaxNumPeersFloodPublish(g, data)
addPeersForFloodPublish(g, topic, peers, maxNumPeersFloodPublish)
if peers.len < g.parameters.dLow:
# not subscribed, or bad mesh, send to fanout peers
addFanoutPeers(g, topic, peers)
if peers.len == 0:
let topicPeers = g.gossipsub.getOrDefault(topic).toSeq()
debug "No peers for topic, skipping publish", peersOnTopic = topicPeers.len,
connectedPeers = topicPeers.filterIt(it.connected).len,
topic
# skipping topic as our metrics finds that heavy
libp2p_gossipsub_failed_publish.inc()
return 0
return g.publishMessage(topic, data, peers)
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
let peer = g.peers.getOrDefault(id) let peer = g.peers.getOrDefault(id)
if isNil(peer): if isNil(peer):

View File

@ -665,14 +665,16 @@ suite "GossipSub":
await sleepAsync(10.milliseconds) await sleepAsync(10.milliseconds)
check false check false
check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17 check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17
# Now try with a mesh # Now try with a mesh
gossip1.subscribe("foobar", handler) gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5 checkExpiring: gossip1.mesh.peers("foobar") > 5
# use a different length so that the message is not equal to the last # use a different length so that the message is not equal to the last
check (await nodes[0].publish("foobar", newSeq[byte](1_000_001))) == 17 check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17
await allFuturesThrowing( await allFuturesThrowing(
nodes.mapIt(it.switch.stop()) nodes.mapIt(it.switch.stop())