pruning improvements

This commit is contained in:
Giovanni Petrantoni 2020-07-28 14:09:37 +09:00
parent 880acd9dc1
commit a34eee7ed4
1 changed files with 15 additions and 4 deletions

View File

@ -217,7 +217,7 @@ proc init*(_: type[TopicParams]): TopicParams =
meshMessageDeliveriesCap: 10, meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 5, meshMessageDeliveriesThreshold: 5,
meshMessageDeliveriesWindow: 5.milliseconds, meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 1.seconds, meshMessageDeliveriesActivation: 10.seconds,
meshFailurePenaltyWeight: -1.0, meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5, meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0, invalidMessageDeliveriesWeight: -1.0,
@ -502,6 +502,7 @@ proc updateScores(g: GossipSub) = # avoid async
topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
debug "p2", peer, p2 = info.firstMessageDeliveries debug "p2", peer, p2 = info.firstMessageDeliveries
if info.meshMessageDeliveriesActive: if info.meshMessageDeliveriesActive:
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
@ -515,7 +516,7 @@ proc updateScores(g: GossipSub) = # avoid async
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
debug "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries debug "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries
debug "updated peer topic's scores", peer, topic, info debug "updated peer topic's scores", peer, topic, info, topicScore
peer.score += topicScore * topicParams.topicWeight peer.score += topicScore * topicParams.topicWeight
@ -555,6 +556,16 @@ proc heartbeat(g: GossipSub) {.async.} =
for t in toSeq(g.topics.keys): for t in toSeq(g.topics.keys):
await g.rebalanceMesh(t) await g.rebalanceMesh(t)
# prune every negative score peer
let meshPeers = g.mesh.getOrDefault(t)
var prunes: seq[Future[void]]
for peer in meshPeers:
if peer.score < 0.0:
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes.add(peer.sendPrune(@[t]))
prunes.allFinished.await.checkFutures()
g.dropFanoutPeers() g.dropFanoutPeers()
# replenish known topics to the fanout # replenish known topics to the fanout
@ -566,7 +577,7 @@ proc heartbeat(g: GossipSub) {.async.} =
for peer in peers.keys: for peer in peers.keys:
if peer in g.peers: if peer in g.peers:
sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer]))) sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer])))
checkFutures(await allFinished(sent)) sent.allFinished.await.checkFutures()
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
except CancelledError as exc: except CancelledError as exc:
@ -608,7 +619,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
g.explicitPeers.excl(peer.id) g.explicitPeers.excl(peer.id)
# don't retain bad score peers # don't retain bad score peers
if peer.score > 0: if peer.score < 0.0:
g.peerStats.del(peer) g.peerStats.del(peer)
return return