rebalance improvements and opportunistic grafting
This commit is contained in:
parent
f7fee2ca97
commit
296ca14133
|
@ -37,6 +37,7 @@ const
|
|||
GossipSubD* = 6
|
||||
GossipSubDlo* = 4
|
||||
GossipSubDhi* = 12
|
||||
GossipSubDout* = 3
|
||||
|
||||
# gossip parameters
|
||||
const
|
||||
|
@ -127,6 +128,7 @@ type
|
|||
gossipsub*: PeerTable # peers that are subscribed to a topic
|
||||
explicit*: PeerTable # directpeers that we keep alive explicitly
|
||||
explicitPeers*: HashSet[PeerID] # explicit (always connected/forward) peers
|
||||
backingOff*: HashSet[PeerID] # explicit (always connected/forward) peers
|
||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||
control*: Table[string, ControlMessage] # pending control messages
|
||||
|
@ -349,7 +351,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
# create a mesh topic that we're subscribing to
|
||||
|
||||
var
|
||||
grafts, prunes: seq[PubSubPeer]
|
||||
grafts, prunes, grafting: seq[PubSubPeer]
|
||||
|
||||
if g.mesh.peers(topic) < GossipSubDlo:
|
||||
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
||||
|
@ -363,16 +365,22 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
meshPeers = g.mesh.peers(topic)
|
||||
grafts = grafts.len
|
||||
|
||||
shuffle(grafts)
|
||||
grafts.keepIf do (x: PubSubPeer) -> bool:
|
||||
# avoid negative score peers
|
||||
x.score >= 0.0 and
|
||||
# don't pick explicit peers
|
||||
x.peerId notin g.explicitPeers and
|
||||
# and avoid peers we are backing off
|
||||
x.peerId notin g.backingOff
|
||||
|
||||
# sort peers by score
|
||||
grafts.sort(proc (x, y: PubSubPeer): int =
|
||||
grafts.sort do (x, y: PubSubPeer) -> int:
|
||||
let
|
||||
peerx = x.score
|
||||
peery = y.score
|
||||
if peerx < peery: -1
|
||||
elif peerx == peery: 0
|
||||
else: 1)
|
||||
else: 1
|
||||
|
||||
# Graft peers so we reach a count of D
|
||||
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
||||
|
@ -383,6 +391,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
g.fanout.removePeer(topic, peer)
|
||||
grafting &= peer
|
||||
|
||||
if g.mesh.peers(topic) > GossipSubDhi:
|
||||
# prune peers if we've gone over Dhi
|
||||
|
@ -395,6 +404,39 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
g.pruned(peer, topic)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
|
||||
# opportunistic grafting, by spec mesh should not be empty...
|
||||
if g.mesh.peers(topic) > 1:
|
||||
var peers = toSeq(g.mesh[topic])
|
||||
peers.sort do (x, y: PubSubPeer) -> int:
|
||||
let
|
||||
peerx = x.score
|
||||
peery = y.score
|
||||
if peerx < peery: -1
|
||||
elif peerx == peery: 0
|
||||
else: 1
|
||||
let medianIdx = peers.len div 2
|
||||
let median = peers[medianIdx]
|
||||
if median.score < g.parameters.opportunisticGraftThreshold:
|
||||
trace "median score below opportunistic threshold", score = median.score
|
||||
var avail = toSeq(
|
||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||
)
|
||||
|
||||
avail.keepIf do (x: PubSubPeer) -> bool:
|
||||
# avoid negative score peers
|
||||
x.score >= median.score and
|
||||
# don't pick explicit peers
|
||||
x.peerId notin g.explicitPeers and
|
||||
# and avoid peers we are backing off
|
||||
x.peerId notin g.backingOff
|
||||
|
||||
for peer in avail:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
grafting &= peer
|
||||
trace "opportunistic grafting", peer = $peer
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||
|
@ -408,7 +450,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
# Send changes to peers after table updates to avoid stale state
|
||||
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
||||
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
||||
discard await g.broadcast(grafts, graft, DefaultSendTimeout)
|
||||
discard await g.broadcast(grafting, graft, DefaultSendTimeout)
|
||||
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
|
||||
|
||||
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
|
||||
|
@ -478,7 +520,7 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
var evicting: seq[PubSubPeer]
|
||||
|
||||
for peer, stats in g.peerStats.mpairs:
|
||||
trace "updating peer score", peer, gossipTopics = peer.topics.len
|
||||
trace "updating peer score", peer
|
||||
|
||||
if not peer.connected:
|
||||
if now > stats.expire:
|
||||
|
@ -956,10 +998,11 @@ method publish*(g: GossipSub,
|
|||
return 0
|
||||
|
||||
if g.parameters.floodPublish:
|
||||
for id, peer in g.peers:
|
||||
if topic in peer.topics and
|
||||
peer.score >= g.parameters.publishThreshold:
|
||||
trace "publish: including flood/high score peer", peer = id
|
||||
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
|
||||
# but a peer's own messages will always be published to all known peers in the topic.
|
||||
for peer in g.gossipsub.getOrDefault(topic):
|
||||
if peer.score >= g.parameters.publishThreshold:
|
||||
trace "publish: including flood/high score peer", peer = $peer
|
||||
peers.incl(peer)
|
||||
|
||||
# add always direct peers
|
||||
|
|
|
@ -126,12 +126,7 @@ method subscribeTopic*(p: PubSub,
|
|||
subscribe: bool,
|
||||
peerId: PeerID) {.base, async.} =
|
||||
# called when remote peer subscribes to a topic
|
||||
var peer = p.peers.getOrDefault(peerId)
|
||||
if not isNil(peer):
|
||||
if subscribe:
|
||||
peer.topics.incl(topic)
|
||||
else:
|
||||
peer.topics.excl(topic)
|
||||
discard
|
||||
|
||||
method rpcHandler*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
|
|
|
@ -42,7 +42,6 @@ type
|
|||
sendConn: Connection
|
||||
peerId*: PeerID
|
||||
handler*: RPCHandler
|
||||
topics*: HashSet[string]
|
||||
sentRpcCache: TimedCache[string] # cache for already sent messages
|
||||
recvdRpcCache: TimedCache[string] # cache for already received messages
|
||||
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
|
@ -209,5 +208,4 @@ proc newPubSubPeer*(peerId: PeerID,
|
|||
result.peerId = peerId
|
||||
result.sentRpcCache = newTimedCache[string](2.minutes)
|
||||
result.recvdRpcCache = newTimedCache[string](2.minutes)
|
||||
result.topics = initHashSet[string]()
|
||||
result.sendLock = newAsyncLock()
|
||||
|
|
Loading…
Reference in New Issue