use var semantics to optimize table access

This commit is contained in:
Giovanni Petrantoni 2020-06-27 13:35:07 +09:00
parent 7a95f1844b
commit 1a2f336eb5

View File

@ -73,57 +73,60 @@ method init*(g: GossipSub) =
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) {.async.} = proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic ## get fanout peers for a topic
trace "about to replenish fanout" debug "about to replenish fanout", topic
if topic notin g.fanout:
g.fanout[topic] = initHashSet[string]()
if g.fanout.getOrDefault(topic).len < GossipSubDLo: var topicHash = g.fanout.mgetOrPut(topic, initHashSet[string]())
trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len
if topic in g.gossipsub: if topicHash.len < GossipSubDLo:
for p in g.gossipsub.getOrDefault(topic): debug "replenishing fanout", peers = topicHash.len
if not g.fanout[topic].containsOrIncl(p): let peers = g.gossipsub.getOrDefault(topic)
if g.fanout.getOrDefault(topic).len == GossipSubD: for p in peers:
if not topicHash.containsOrIncl(p):
# set the fanout expiry time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
if topicHash.len == GossipSubD:
break break
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout.set(topicHash.len.int64, labelValues = [topic])
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) debug "fanout replenished with peers", peers = topicHash.len
trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
try: try:
trace "about to rebalance mesh" trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to
if topic notin g.mesh:
g.mesh[topic] = initHashSet[string]()
if g.mesh.getOrDefault(topic).len < GossipSubDlo: var
topicHash = g.mesh.mgetOrPut(topic, initHashSet[string]())
fanOutHash = g.fanout.mgetOrPut(topic, initHashSet[string]())
gossipHash = g.gossipsub.mgetOrPut(topic, initHashSet[string]())
if topicHash.len < GossipSubDlo:
trace "replenishing mesh", topic trace "replenishing mesh", topic
# replenish the mesh if we're below GossipSubDlo # replenish the mesh if we're below GossipSubDlo
while g.mesh.getOrDefault(topic).len < GossipSubD: while topicHash.len < GossipSubD:
trace "gathering peers", peers = g.mesh.getOrDefault(topic).len trace "gathering peers", peers = topicHash.len
await sleepAsync(1.millis) # don't starve the event loop await sleepAsync(1.millis) # don't starve the event loop
var id: string var id: string
if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0: if fanOutHash.len > 0:
trace "getting peer from fanout", topic, trace "getting peer from fanout", topic,
peers = g.fanout.getOrDefault(topic).len peers = fanOutHash.len
id = sample(toSeq(g.fanout.getOrDefault(topic))) id = sample(toSeq(fanOutHash))
g.fanout[topic].excl(id) fanOutHash.excl(id)
if id in g.fanout[topic]: if id in fanOutHash:
continue # we already have this peer in the mesh, try again continue # we already have this peer in the mesh, try again
trace "got fanout peer", peer = id trace "got fanout peer", peer = id
elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0: elif gossipHash.len > 0:
trace "getting peer from gossipsub", topic, trace "getting peer from gossipsub", topic,
peers = g.gossipsub.getOrDefault(topic).len peers = gossipHash.len
id = sample(toSeq(g.gossipsub[topic])) id = sample(toSeq(gossipHash))
g.gossipsub[topic].excl(id) gossipHash.excl(id)
if id in g.mesh[topic]: if id in topicHash:
continue # we already have this peer in the mesh, try again continue # we already have this peer in the mesh, try again
trace "got gossipsub peer", peer = id trace "got gossipsub peer", peer = id
@ -131,34 +134,34 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "no more peers" trace "no more peers"
break break
g.mesh[topic].incl(id) topicHash.incl(id)
if id in g.peers: if id in g.peers:
let p = g.peers[id] let p = g.peers[id]
# send a graft message to the peer # send a graft message to the peer
await p.sendGraft(@[topic]) await p.sendGraft(@[topic])
# prune peers if we've gone over # prune peers if we've gone over
if g.mesh.getOrDefault(topic).len > GossipSubDhi: if topicHash.len > GossipSubDhi:
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len trace "about to prune mesh", mesh = topicHash.len
while g.mesh.getOrDefault(topic).len > GossipSubD: while topicHash.len > GossipSubD:
trace "pruning peers", peers = g.mesh[topic].len trace "pruning peers", peers = topicHash.len
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)] let id = toSeq(topicHash)[rand(0..<topicHash.len)]
g.mesh[topic].excl(id) topicHash.excl(id)
let p = g.peers[id] let p = g.peers[id]
# send a graft message to the peer # send a graft message to the peer
await p.sendPrune(@[topic]) await p.sendPrune(@[topic])
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) .set(gossipHash.len.int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) .set(fanOutHash.len.int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.getOrDefault(topic).len.int64, labelValues = [topic]) .set(topicHash.len.int64, labelValues = [topic])
trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len, trace "mesh balanced, got peers", peers = topicHash.len,
topicId = topic topicId = topic
except CatchableError as exc: except CatchableError as exc:
trace "exception occurred re-balancing mesh", exc = exc.msg trace "exception occurred re-balancing mesh", exc = exc.msg