diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d684a4a36..4de07383e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -217,7 +217,7 @@ proc init*(_: type[TopicParams]): TopicParams = meshMessageDeliveriesCap: 10, meshMessageDeliveriesThreshold: 5, meshMessageDeliveriesWindow: 5.milliseconds, - meshMessageDeliveriesActivation: 1.seconds, + meshMessageDeliveriesActivation: 10.seconds, meshFailurePenaltyWeight: -1.0, meshFailurePenaltyDecay: 0.5, invalidMessageDeliveriesWeight: -1.0, @@ -502,6 +502,7 @@ proc updateScores(g: GossipSub) = # avoid async topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight debug "p2", peer, p2 = info.firstMessageDeliveries + if info.meshMessageDeliveriesActive: if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries @@ -515,7 +516,7 @@ proc updateScores(g: GossipSub) = # avoid async topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight debug "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries - debug "updated peer topic's scores", peer, topic, info + debug "updated peer topic's scores", peer, topic, info, topicScore peer.score += topicScore * topicParams.topicWeight @@ -555,6 +556,16 @@ proc heartbeat(g: GossipSub) {.async.} = for t in toSeq(g.topics.keys): await g.rebalanceMesh(t) + # prune every negative score peer + let meshPeers = g.mesh.getOrDefault(t) + var prunes: seq[Future[void]] + for peer in meshPeers: + if peer.score < 0.0: + g.pruned(peer, t) + g.mesh.removePeer(t, peer) + prunes.add(peer.sendPrune(@[t])) + prunes.allFinished.await.checkFutures() + g.dropFanoutPeers() # replenish known topics to the fanout @@ -566,7 +577,7 @@ proc heartbeat(g: GossipSub) {.async.} = for peer in peers.keys: if peer in g.peers: sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer]))) - checkFutures(await allFinished(sent)) + sent.allFinished.await.checkFutures() g.mcache.shift() # shift the cache except CancelledError as exc: @@ -608,7 +619,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = g.explicitPeers.excl(peer.id) # don't retain bad score peers - if peer.score > 0: + if peer.score < 0.0: g.peerStats.del(peer) return