Gossip runtime params (#437)
* move gossip parameters to runtime * internal test fixes * add missing params * restore const parameters are soldi base and use them in init * more constants tuning
This commit is contained in:
parent
92fa4110c1
commit
93b6c4dc52
|
@ -32,7 +32,7 @@ const
|
|||
GossipSubCodec_10* = "/meshsub/1.0.0"
|
||||
|
||||
# overlay parameters
|
||||
const
|
||||
const
|
||||
GossipSubD* = 6
|
||||
GossipSubDlo* = 4
|
||||
GossipSubDhi* = 12
|
||||
|
@ -41,17 +41,18 @@ const
|
|||
const
|
||||
GossipSubHistoryLength* = 5
|
||||
GossipSubHistoryGossip* = 3
|
||||
GossipBackoffPeriod* = 1.minutes
|
||||
|
||||
# heartbeat interval
|
||||
const
|
||||
GossipSubHeartbeatInitialDelay* = 100.millis
|
||||
GossipSubHeartbeatInterval* = 1.seconds
|
||||
|
||||
# fanout ttl
|
||||
const
|
||||
GossipSubFanoutTTL* = 1.minutes
|
||||
|
||||
# gossip parameters
|
||||
const
|
||||
GossipBackoffPeriod* = 1.minutes
|
||||
|
||||
const
|
||||
BackoffSlackTime = 2 # seconds
|
||||
IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat )
|
||||
|
@ -107,10 +108,20 @@ type
|
|||
pruneBackoff*: Duration
|
||||
floodPublish*: bool
|
||||
gossipFactor*: float64
|
||||
d*: int
|
||||
dLow*: int
|
||||
dHigh*: int
|
||||
dScore*: int
|
||||
dOut*: int
|
||||
dLazy*: int
|
||||
|
||||
heartbeatInterval*: Duration
|
||||
|
||||
historyLength*: int
|
||||
historyGossip*: int
|
||||
|
||||
fanoutTTL*: Duration
|
||||
|
||||
gossipThreshold*: float64
|
||||
publishThreshold*: float64
|
||||
graylistThreshold*: float64
|
||||
|
@ -174,9 +185,16 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
|||
pruneBackoff: 1.minutes,
|
||||
floodPublish: true,
|
||||
gossipFactor: 0.25,
|
||||
dScore: 4,
|
||||
dOut: GossipSubDlo - 1,
|
||||
dLazy: GossipSubD,
|
||||
d: GossipSubD,
|
||||
dLow: GossipSubDlo,
|
||||
dHigh: GossipSubDhi,
|
||||
dScore: GossipSubDlo,
|
||||
dOut: GossipSubDlo - 1, # DLow - 1
|
||||
dLazy: GossipSubD, # Like D
|
||||
heartbeatInterval: GossipSubHeartbeatInterval,
|
||||
historyLength: GossipSubHistoryLength,
|
||||
historyGossip: GossipSubHistoryGossip,
|
||||
fanoutTTL: GossipSubFanoutTTL,
|
||||
gossipThreshold: -10,
|
||||
publishThreshold: -100,
|
||||
graylistThreshold: -10000,
|
||||
|
@ -192,8 +210,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
|||
)
|
||||
|
||||
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
|
||||
if (parameters.dOut >= GossipSubDlo) or
|
||||
(parameters.dOut > (GossipSubD div 2)):
|
||||
if (parameters.dOut >= parameters.dLow) or
|
||||
(parameters.dOut > (parameters.d div 2)):
|
||||
err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2")
|
||||
elif parameters.gossipThreshold >= 0:
|
||||
err("gossipsub: gossipThreshold parameter error, Must be < 0")
|
||||
|
@ -339,7 +357,7 @@ proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] =
|
|||
peers.keepIf do (x: PubSubPeer) -> bool:
|
||||
x.score >= 0.0
|
||||
# by spec, larger then Dhi, but let's put some hard caps
|
||||
peers.setLen(min(peers.len, GossipSubDhi * 2))
|
||||
peers.setLen(min(peers.len, g.parameters.dHigh * 2))
|
||||
peers.map do (x: PubSubPeer) -> PeerInfoMsg:
|
||||
PeerInfoMsg(peerID: x.peerId.getBytes())
|
||||
|
||||
|
@ -348,12 +366,12 @@ proc replenishFanout(g: GossipSub, topic: string) =
|
|||
logScope: topic
|
||||
trace "about to replenish fanout"
|
||||
|
||||
if g.fanout.peers(topic) < GossipSubDLo:
|
||||
if g.fanout.peers(topic) < g.parameters.dLow:
|
||||
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
||||
if topic in g.gossipsub:
|
||||
for peer in g.gossipsub[topic]:
|
||||
if g.fanout.addPeer(topic, peer):
|
||||
if g.fanout.peers(topic) == GossipSubD:
|
||||
if g.fanout.peers(topic) == g.parameters.d:
|
||||
break
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
|
@ -392,7 +410,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
grafts, prunes, grafting: seq[PubSubPeer]
|
||||
|
||||
let npeers = g.mesh.peers(topic)
|
||||
if npeers < GossipSubDlo:
|
||||
if npeers < g.parameters.dLow:
|
||||
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
||||
# replenish the mesh if we're below Dlo
|
||||
grafts = toSeq(
|
||||
|
@ -415,7 +433,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
grafts.sort(byScore, SortOrder.Descending)
|
||||
|
||||
# Graft peers so we reach a count of D
|
||||
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
||||
grafts.setLen(min(grafts.len, g.parameters.d - g.mesh.peers(topic)))
|
||||
|
||||
trace "grafting", grafts = grafts.len
|
||||
for peer in grafts:
|
||||
|
@ -463,7 +481,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
grafting &= peer
|
||||
|
||||
|
||||
if g.mesh.peers(topic) > GossipSubDhi:
|
||||
if g.mesh.peers(topic) > g.parameters.dHigh:
|
||||
# prune peers if we've gone over Dhi
|
||||
prunes = toSeq(g.mesh[topic])
|
||||
# avoid pruning peers we are currently grafting in this heartbeat
|
||||
|
@ -489,7 +507,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
else:
|
||||
inbound &= peer
|
||||
|
||||
# ensure that there are at least D_out peers first and rebalance to GossipSubD after that
|
||||
# ensure that there are at least D_out peers first and rebalance to g.d after that
|
||||
let maxOutboundPrunes =
|
||||
block:
|
||||
var count = 0
|
||||
|
@ -502,7 +520,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
# concat remaining outbound peers
|
||||
inbound &= outbound
|
||||
|
||||
let pruneLen = inbound.len - GossipSubD
|
||||
let pruneLen = inbound.len - g.parameters.d
|
||||
if pruneLen > 0:
|
||||
# Ok we got some peers to prune,
|
||||
# for this heartbeat let's prune those
|
||||
|
@ -824,7 +842,7 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
trace "firing heartbeat event", instance = cast[int](g)
|
||||
trigger.fire()
|
||||
|
||||
await sleepAsync(GossipSubHeartbeatInterval)
|
||||
await sleepAsync(g.parameters.heartbeatInterval)
|
||||
|
||||
method unsubscribePeer*(g: GossipSub, peer: PeerID) =
|
||||
## handle peer disconnects
|
||||
|
@ -971,7 +989,7 @@ proc handleGraft(g: GossipSub,
|
|||
# If they send us a graft before they send us a subscribe, what should
|
||||
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||
if topic in g.topics:
|
||||
if g.mesh.peers(topic) < GossipSubDHi or peer.outbound:
|
||||
if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound:
|
||||
# In the spec, there's no mention of DHi here, but implicitly, a
|
||||
# peer will be removed from the mesh on next rebalance, so we don't want
|
||||
# this peer to push someone else out
|
||||
|
@ -1251,7 +1269,7 @@ method publish*(g: GossipSub,
|
|||
# on the topic, so it makes sense
|
||||
# to update the last topic publish
|
||||
# time
|
||||
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
|
||||
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
|
||||
|
||||
if peers.len == 0:
|
||||
debug "No peers for topic, skipping publish"
|
||||
|
@ -1333,7 +1351,7 @@ method initPubSub*(g: GossipSub) =
|
|||
g.parameters.validateParameters().tryGet()
|
||||
|
||||
randomize()
|
||||
g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength)
|
||||
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
|
||||
g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer
|
||||
g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer
|
||||
g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers
|
||||
|
|
|
@ -56,7 +56,7 @@ suite "GossipSub internal":
|
|||
|
||||
check gossipSub.peers.len == 15
|
||||
await gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len == GossipSubD # + 2 # account opportunistic grafts
|
||||
check gossipSub.mesh[topic].len == gossipSub.parameters.d # + 2 # account opportunistic grafts
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
@ -83,7 +83,7 @@ suite "GossipSub internal":
|
|||
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
await gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len == GossipSubD + gossipSub.parameters.dScore
|
||||
check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
@ -111,7 +111,7 @@ suite "GossipSub internal":
|
|||
|
||||
check gossipSub.gossipsub[topic].len == 15
|
||||
gossipSub.replenishFanout(topic)
|
||||
check gossipSub.fanout[topic].len == GossipSubD
|
||||
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
@ -139,7 +139,7 @@ suite "GossipSub internal":
|
|||
peer.handler = handler
|
||||
gossipSub.fanout[topic].incl(peer)
|
||||
|
||||
check gossipSub.fanout[topic].len == GossipSubD
|
||||
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
||||
|
||||
gossipSub.dropFanoutPeers()
|
||||
check topic notin gossipSub.fanout
|
||||
|
@ -175,8 +175,8 @@ suite "GossipSub internal":
|
|||
gossipSub.fanout[topic1].incl(peer)
|
||||
gossipSub.fanout[topic2].incl(peer)
|
||||
|
||||
check gossipSub.fanout[topic1].len == GossipSubD
|
||||
check gossipSub.fanout[topic2].len == GossipSubD
|
||||
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
|
||||
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
|
||||
|
||||
gossipSub.dropFanoutPeers()
|
||||
check topic1 notin gossipSub.fanout
|
||||
|
@ -240,7 +240,7 @@ suite "GossipSub internal":
|
|||
check gossipSub.gossipsub[topic].len == 15
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == GossipSubD
|
||||
check peers.len == gossipSub.parameters.d
|
||||
for p in peers.keys:
|
||||
check not gossipSub.fanout.hasPeerID(topic, p.peerId)
|
||||
check not gossipSub.mesh.hasPeerID(topic, p.peerId)
|
||||
|
@ -284,7 +284,7 @@ suite "GossipSub internal":
|
|||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == GossipSubD
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
@ -326,7 +326,7 @@ suite "GossipSub internal":
|
|||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == GossipSubD
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
|
Loading…
Reference in New Issue