From 1a2f336eb580ef3dad4c7e393c4c2b561dc59648 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 27 Jun 2020 13:35:07 +0900 Subject: [PATCH] use var semantics to optimize table access --- libp2p/protocols/pubsub/gossipsub.nim | 83 ++++++++++++++------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 85bff0f59..3d3fc366a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -73,57 +73,60 @@ method init*(g: GossipSub) = g.handler = handler g.codec = GossipSubCodec -proc replenishFanout(g: GossipSub, topic: string) {.async.} = +proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic - trace "about to replenish fanout" - if topic notin g.fanout: - g.fanout[topic] = initHashSet[string]() + debug "about to replenish fanout", topic - if g.fanout.getOrDefault(topic).len < GossipSubDLo: - trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len - if topic in g.gossipsub: - for p in g.gossipsub.getOrDefault(topic): - if not g.fanout[topic].containsOrIncl(p): - if g.fanout.getOrDefault(topic).len == GossipSubD: + var topicHash = g.fanout.mgetOrPut(topic, initHashSet[string]()) + + if topicHash.len < GossipSubDLo: + debug "replenishing fanout", peers = topicHash.len + let peers = g.gossipsub.getOrDefault(topic) + for p in peers: + if not topicHash.containsOrIncl(p): + # set the fanout expiry time + g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + if topicHash.len == GossipSubD: break - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) - trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len + libp2p_gossipsub_peers_per_topic_fanout.set(topicHash.len.int64, labelValues = [topic]) + debug "fanout replenished with peers", peers = topicHash.len proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: trace "about to rebalance mesh" - # create a mesh topic that we're subscribing to - if topic notin g.mesh: - g.mesh[topic] = initHashSet[string]() - if g.mesh.getOrDefault(topic).len < GossipSubDlo: + var + topicHash = g.mesh.mgetOrPut(topic, initHashSet[string]()) + fanOutHash = g.fanout.mgetOrPut(topic, initHashSet[string]()) + gossipHash = g.gossipsub.mgetOrPut(topic, initHashSet[string]()) + + if topicHash.len < GossipSubDlo: trace "replenishing mesh", topic # replenish the mesh if we're below GossipSubDlo - while g.mesh.getOrDefault(topic).len < GossipSubD: - trace "gathering peers", peers = g.mesh.getOrDefault(topic).len + while topicHash.len < GossipSubD: + trace "gathering peers", peers = topicHash.len await sleepAsync(1.millis) # don't starve the event loop var id: string - if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0: + if fanOutHash.len > 0: trace "getting peer from fanout", topic, - peers = g.fanout.getOrDefault(topic).len + peers = fanOutHash.len - id = sample(toSeq(g.fanout.getOrDefault(topic))) - g.fanout[topic].excl(id) + id = sample(toSeq(fanOutHash)) + fanOutHash.excl(id) - if id in g.fanout[topic]: + if id in fanOutHash: continue # we already have this peer in the mesh, try again trace "got fanout peer", peer = id - elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0: + elif gossipHash.len > 0: trace "getting peer from gossipsub", topic, - peers = g.gossipsub.getOrDefault(topic).len + peers = gossipHash.len - id = sample(toSeq(g.gossipsub[topic])) - g.gossipsub[topic].excl(id) + id = sample(toSeq(gossipHash)) + gossipHash.excl(id) - if id in g.mesh[topic]: + if id in topicHash: continue # we already have this peer in the mesh, try again trace "got gossipsub peer", peer = id @@ -131,34 +134,34 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "no more peers" break - g.mesh[topic].incl(id) + topicHash.incl(id) if id in g.peers: let p = g.peers[id] # send a graft message to the peer await p.sendGraft(@[topic]) # prune peers if we've gone over - if g.mesh.getOrDefault(topic).len > GossipSubDhi: - trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len - while g.mesh.getOrDefault(topic).len > GossipSubD: - trace "pruning peers", peers = g.mesh[topic].len - let id = toSeq(g.mesh[topic])[rand(0.. GossipSubDhi: + trace "about to prune mesh", mesh = topicHash.len + while topicHash.len > GossipSubD: + trace "pruning peers", peers = topicHash.len + let id = toSeq(topicHash)[rand(0..