From 66c9cd2c8c25d1e34523f552d00cea6a2e487292 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 8 Aug 2020 18:27:57 +0900 Subject: [PATCH] score fixes --- libp2p/protocols/pubsub/gossipsub.nim | 46 +++++++++++++++++---------- tests/pubsub/testgossipsub.nim | 4 +++ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 57b84a9c9..5236a6051 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -218,7 +218,7 @@ proc init*(_: type[TopicParams]): TopicParams = meshMessageDeliveriesWeight: -1.0, meshMessageDeliveriesDecay: 0.5, meshMessageDeliveriesCap: 10, - meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesThreshold: 1, meshMessageDeliveriesWindow: 5.milliseconds, meshMessageDeliveriesActivation: 10.seconds, meshFailurePenaltyWeight: -1.0, @@ -559,9 +559,9 @@ proc heartbeat(g: GossipSub) {.async.} = g.updateScores() for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) - # prune every negative score peer + # do this before relance + # in order to avoid grafted -> pruned in the same cycle let meshPeers = g.mesh.getOrDefault(t) var prunes: seq[Future[void]] for peer in meshPeers: @@ -571,6 +571,8 @@ proc heartbeat(g: GossipSub) {.async.} = prunes.add(peer.sendPrune(@[t])) prunes.allFinished.await.checkFutures() + await g.rebalanceMesh(t) + g.dropFanoutPeers() # replenish known topics to the fanout @@ -627,21 +629,20 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) - # TODO - # if peer.peerInfo.maintain: - # for t in toSeq(g.explicit.keys): - # g.explicit.removePeer(t, peer) + # TODO + # if peer.peerInfo.maintain: + # for t in toSeq(g.explicit.keys): + # g.explicit.removePeer(t, peer) + # g.explicitPeers.excl(peer.id) - g.explicitPeers.excl(peer.id) - - # don't retain bad score peers - if peer.score < 0.0: - g.peerStats.del(peer) - return + # don't retain bad score peers + if peer.score < 0.0: + g.peerStats.del(peer) + return - g.peerStats[peer].expire = Moment.now() + g.parameters.retainScore - for topic, info in g.peerStats[peer].topicInfos.mpairs: - info.firstMessageDeliveries = 0 + g.peerStats[peer].expire = Moment.now() + g.parameters.retainScore + for topic, info in g.peerStats[peer].topicInfos.mpairs: + info.firstMessageDeliveries = 0 method subscribePeer*(p: GossipSub, conn: Connection) = @@ -797,6 +798,19 @@ method rpcHandler*(g: GossipSub, if msgId in g.seen: trace "message already processed, skipping" + + # make sure to update score tho before continuing + for t in msg.topicIDs: # for every topic in the message + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # if in mesh add more delivery score + var stats = g.peerStats[peer].topicInfos.getOrDefault(t) + if stats.inMesh: + stats.meshMessageDeliveries += 1 + if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + + # commit back to the table + g.peerStats[peer].topicInfos[t] = stats continue trace "processing message" diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 4a14750a0..5dad7cd74 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -478,6 +478,8 @@ suite "GossipSub": nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Noise]) + var gossipSub = GossipSub(nodes[i].pubSub.get()) + gossipSub.parameters.floodPublish = false awaitters.add((await nodes[i].start())) let subscribes = await subscribeRandom(nodes) @@ -537,6 +539,8 @@ suite "GossipSub": nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Secio]) + var gossipSub = GossipSub(nodes[i].pubSub.get()) + gossipSub.parameters.floodPublish = false awaitters.add((await nodes[i].start())) let subscribes = await subscribeSparseNodes(nodes, 1)