getGossipPeers fixes

This commit is contained in:
Giovanni Petrantoni 2020-07-01 17:26:42 +09:00
parent a976c22dae
commit d9e0ca6091
1 changed files with 15 additions and 16 deletions

View File

@ -135,7 +135,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
let p = g.peers[id]
# send a graft message to the peer
await p.sendGraft(@[topic])
# prune peers if we've gone over
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
@ -178,6 +178,11 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
proc shuffle[T](collection: var seq[T]) =
for i in 0..collection.high:
let r = rand(collection.high)
swap(collection[i], collection[r])
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
@ -186,28 +191,22 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
let fanout: HashSet[string] = g.fanout.getOrDefault(topic)
let gossipPeers = mesh + fanout
var extraPeers = g.gossipsub # copy it!
let mids = g.mcache.window(topic)
if mids.len > 0:
let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))
if topic notin extraPeers:
if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue
while result.len < GossipSubD:
if extraPeers.getOrDefault(topic).len == 0:
trace "no peers for topic, skipping", topicID = topic
break
let id = toSeq(extraPeers.getOrDefault(topic)).sample()
if id in extraPeers.getOrDefault(topic):
extraPeers[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
var extraPeers = toSeq(g.gossipsub[topic])
shuffle(extraPeers)
for peer in extraPeers:
if result.len < GossipSubD and
peer notin gossipPeers and
peer notin result:
result[peer] = ControlMessage(ihave: @[ihave])
proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: