diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 4bbd65d..7716e89 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -32,7 +32,7 @@ const GossipSubCodec_10* = "/meshsub/1.0.0" # overlay parameters -const +const GossipSubD* = 6 GossipSubDlo* = 4 GossipSubDhi* = 12 @@ -41,17 +41,18 @@ const const GossipSubHistoryLength* = 5 GossipSubHistoryGossip* = 3 - GossipBackoffPeriod* = 1.minutes # heartbeat interval -const - GossipSubHeartbeatInitialDelay* = 100.millis GossipSubHeartbeatInterval* = 1.seconds # fanout ttl const GossipSubFanoutTTL* = 1.minutes +# gossip parameters +const + GossipBackoffPeriod* = 1.minutes + const BackoffSlackTime = 2 # seconds IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat ) @@ -107,10 +108,20 @@ type pruneBackoff*: Duration floodPublish*: bool gossipFactor*: float64 + d*: int + dLow*: int + dHigh*: int dScore*: int dOut*: int dLazy*: int + heartbeatInterval*: Duration + + historyLength*: int + historyGossip*: int + + fanoutTTL*: Duration + gossipThreshold*: float64 publishThreshold*: float64 graylistThreshold*: float64 @@ -174,9 +185,16 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = pruneBackoff: 1.minutes, floodPublish: true, gossipFactor: 0.25, - dScore: 4, - dOut: GossipSubDlo - 1, - dLazy: GossipSubD, + d: GossipSubD, + dLow: GossipSubDlo, + dHigh: GossipSubDhi, + dScore: GossipSubDlo, + dOut: GossipSubDlo - 1, # DLow - 1 + dLazy: GossipSubD, # Like D + heartbeatInterval: GossipSubHeartbeatInterval, + historyLength: GossipSubHistoryLength, + historyGossip: GossipSubHistoryGossip, + fanoutTTL: GossipSubFanoutTTL, gossipThreshold: -10, publishThreshold: -100, graylistThreshold: -10000, @@ -192,8 +210,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = - if (parameters.dOut >= GossipSubDlo) or - (parameters.dOut > (GossipSubD div 2)): + if (parameters.dOut >= parameters.dLow) or + (parameters.dOut > (parameters.d div 2)): err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2") elif parameters.gossipThreshold >= 0: err("gossipsub: gossipThreshold parameter error, Must be < 0") @@ -339,7 +357,7 @@ proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] = peers.keepIf do (x: PubSubPeer) -> bool: x.score >= 0.0 # by spec, larger then Dhi, but let's put some hard caps - peers.setLen(min(peers.len, GossipSubDhi * 2)) + peers.setLen(min(peers.len, g.parameters.dHigh * 2)) peers.map do (x: PubSubPeer) -> PeerInfoMsg: PeerInfoMsg(peerID: x.peerId.getBytes()) @@ -348,12 +366,12 @@ proc replenishFanout(g: GossipSub, topic: string) = logScope: topic trace "about to replenish fanout" - if g.fanout.peers(topic) < GossipSubDLo: + if g.fanout.peers(topic) < g.parameters.dLow: trace "replenishing fanout", peers = g.fanout.peers(topic) if topic in g.gossipsub: for peer in g.gossipsub[topic]: if g.fanout.addPeer(topic, peer): - if g.fanout.peers(topic) == GossipSubD: + if g.fanout.peers(topic) == g.parameters.d: break when defined(libp2p_expensive_metrics): @@ -392,7 +410,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = grafts, prunes, grafting: seq[PubSubPeer] let npeers = g.mesh.peers(topic) - if npeers < GossipSubDlo: + if npeers < g.parameters.dLow: trace "replenishing mesh", peers = g.mesh.peers(topic) # replenish the mesh if we're below Dlo grafts = toSeq( @@ -415,7 +433,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = grafts.sort(byScore, SortOrder.Descending) # Graft peers so we reach a count of D - grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) + grafts.setLen(min(grafts.len, g.parameters.d - g.mesh.peers(topic))) trace "grafting", grafts = grafts.len for peer in grafts: @@ -463,7 +481,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = grafting &= peer - if g.mesh.peers(topic) > GossipSubDhi: + if g.mesh.peers(topic) > g.parameters.dHigh: # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) # avoid pruning peers we are currently grafting in this heartbeat @@ -489,7 +507,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = else: inbound &= peer - # ensure that there are at least D_out peers first and rebalance to GossipSubD after that + # ensure that there are at least D_out peers first and rebalance to g.d after that let maxOutboundPrunes = block: var count = 0 @@ -502,7 +520,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # concat remaining outbound peers inbound &= outbound - let pruneLen = inbound.len - GossipSubD + let pruneLen = inbound.len - g.parameters.d if pruneLen > 0: # Ok we got some peers to prune, # for this heartbeat let's prune those @@ -824,7 +842,7 @@ proc heartbeat(g: GossipSub) {.async.} = trace "firing heartbeat event", instance = cast[int](g) trigger.fire() - await sleepAsync(GossipSubHeartbeatInterval) + await sleepAsync(g.parameters.heartbeatInterval) method unsubscribePeer*(g: GossipSub, peer: PeerID) = ## handle peer disconnects @@ -971,7 +989,7 @@ proc handleGraft(g: GossipSub, # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. if topic in g.topics: - if g.mesh.peers(topic) < GossipSubDHi or peer.outbound: + if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound: # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out @@ -1251,7 +1269,7 @@ method publish*(g: GossipSub, # on the topic, so it makes sense # to update the last topic publish # time - g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL) if peers.len == 0: debug "No peers for topic, skipping publish" @@ -1333,7 +1351,7 @@ method initPubSub*(g: GossipSub) = g.parameters.validateParameters().tryGet() randomize() - g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) + g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 04f93f1..0679631 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -56,7 +56,7 @@ suite "GossipSub internal": check gossipSub.peers.len == 15 await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD # + 2 # account opportunistic grafts + check gossipSub.mesh[topic].len == gossipSub.parameters.d # + 2 # account opportunistic grafts await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -83,7 +83,7 @@ suite "GossipSub internal": check gossipSub.mesh[topic].len == 15 await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD + gossipSub.parameters.dScore + check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -111,7 +111,7 @@ suite "GossipSub internal": check gossipSub.gossipsub[topic].len == 15 gossipSub.replenishFanout(topic) - check gossipSub.fanout[topic].len == GossipSubD + check gossipSub.fanout[topic].len == gossipSub.parameters.d await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -139,7 +139,7 @@ suite "GossipSub internal": peer.handler = handler gossipSub.fanout[topic].incl(peer) - check gossipSub.fanout[topic].len == GossipSubD + check gossipSub.fanout[topic].len == gossipSub.parameters.d gossipSub.dropFanoutPeers() check topic notin gossipSub.fanout @@ -175,8 +175,8 @@ suite "GossipSub internal": gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic2].incl(peer) - check gossipSub.fanout[topic1].len == GossipSubD - check gossipSub.fanout[topic2].len == GossipSubD + check gossipSub.fanout[topic1].len == gossipSub.parameters.d + check gossipSub.fanout[topic2].len == gossipSub.parameters.d gossipSub.dropFanoutPeers() check topic1 notin gossipSub.fanout @@ -240,7 +240,7 @@ suite "GossipSub internal": check gossipSub.gossipsub[topic].len == 15 let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD + check peers.len == gossipSub.parameters.d for p in peers.keys: check not gossipSub.fanout.hasPeerID(topic, p.peerId) check not gossipSub.mesh.hasPeerID(topic, p.peerId) @@ -284,7 +284,7 @@ suite "GossipSub internal": gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD + check peers.len == gossipSub.parameters.d await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -326,7 +326,7 @@ suite "GossipSub internal": gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() - check peers.len == GossipSubD + check peers.len == gossipSub.parameters.d await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop()