score fixes

This commit is contained in:
Giovanni Petrantoni 2020-08-08 18:27:57 +09:00
parent 4dcc3b6dda
commit 66c9cd2c8c
2 changed files with 34 additions and 16 deletions

View File

@ -218,7 +218,7 @@ proc init*(_: type[TopicParams]): TopicParams =
meshMessageDeliveriesWeight: -1.0, meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5, meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10, meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 5, meshMessageDeliveriesThreshold: 1,
meshMessageDeliveriesWindow: 5.milliseconds, meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 10.seconds, meshMessageDeliveriesActivation: 10.seconds,
meshFailurePenaltyWeight: -1.0, meshFailurePenaltyWeight: -1.0,
@ -559,9 +559,9 @@ proc heartbeat(g: GossipSub) {.async.} =
g.updateScores() g.updateScores()
for t in toSeq(g.topics.keys): for t in toSeq(g.topics.keys):
await g.rebalanceMesh(t)
# prune every negative score peer # prune every negative score peer
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
let meshPeers = g.mesh.getOrDefault(t) let meshPeers = g.mesh.getOrDefault(t)
var prunes: seq[Future[void]] var prunes: seq[Future[void]]
for peer in meshPeers: for peer in meshPeers:
@ -571,6 +571,8 @@ proc heartbeat(g: GossipSub) {.async.} =
prunes.add(peer.sendPrune(@[t])) prunes.add(peer.sendPrune(@[t]))
prunes.allFinished.await.checkFutures() prunes.allFinished.await.checkFutures()
await g.rebalanceMesh(t)
g.dropFanoutPeers() g.dropFanoutPeers()
# replenish known topics to the fanout # replenish known topics to the fanout
@ -627,21 +629,20 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t]) .set(g.fanout.peers(t).int64, labelValues = [t])
# TODO # TODO
# if peer.peerInfo.maintain: # if peer.peerInfo.maintain:
# for t in toSeq(g.explicit.keys): # for t in toSeq(g.explicit.keys):
# g.explicit.removePeer(t, peer) # g.explicit.removePeer(t, peer)
# g.explicitPeers.excl(peer.id)
g.explicitPeers.excl(peer.id) # don't retain bad score peers
if peer.score < 0.0:
g.peerStats.del(peer)
return
# don't retain bad score peers g.peerStats[peer].expire = Moment.now() + g.parameters.retainScore
if peer.score < 0.0: for topic, info in g.peerStats[peer].topicInfos.mpairs:
g.peerStats.del(peer) info.firstMessageDeliveries = 0
return
g.peerStats[peer].expire = Moment.now() + g.parameters.retainScore
for topic, info in g.peerStats[peer].topicInfos.mpairs:
info.firstMessageDeliveries = 0
method subscribePeer*(p: GossipSub, method subscribePeer*(p: GossipSub,
conn: Connection) = conn: Connection) =
@ -797,6 +798,19 @@ method rpcHandler*(g: GossipSub,
if msgId in g.seen: if msgId in g.seen:
trace "message already processed, skipping" trace "message already processed, skipping"
# make sure to update score tho before continuing
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
continue continue
trace "processing message" trace "processing message"

View File

@ -478,6 +478,8 @@ suite "GossipSub":
nodes.add newStandardSwitch(triggerSelf = true, nodes.add newStandardSwitch(triggerSelf = true,
gossip = true, gossip = true,
secureManagers = [SecureProtocol.Noise]) secureManagers = [SecureProtocol.Noise])
var gossipSub = GossipSub(nodes[i].pubSub.get())
gossipSub.parameters.floodPublish = false
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
let subscribes = await subscribeRandom(nodes) let subscribes = await subscribeRandom(nodes)
@ -537,6 +539,8 @@ suite "GossipSub":
nodes.add newStandardSwitch(triggerSelf = true, nodes.add newStandardSwitch(triggerSelf = true,
gossip = true, gossip = true,
secureManagers = [SecureProtocol.Secio]) secureManagers = [SecureProtocol.Secio])
var gossipSub = GossipSub(nodes[i].pubSub.get())
gossipSub.parameters.floodPublish = false
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
let subscribes = await subscribeSparseNodes(nodes, 1) let subscribes = await subscribeSparseNodes(nodes, 1)