score wip

This commit is contained in:
Giovanni Petrantoni 2020-07-21 11:25:14 +09:00
parent 0776cc77e7
commit 884d05cbc2
1 changed files with 34 additions and 9 deletions

View File

@ -63,6 +63,7 @@ type
firstMessageDeliveries: float64 firstMessageDeliveries: float64
meshMessageDeliveries: float64 meshMessageDeliveries: float64
meshFailurePenalty: float64 meshFailurePenalty: float64
invalidMessageDeliveries: float64
TopicParams* = object TopicParams* = object
topicWeight*: float64 topicWeight*: float64
@ -470,16 +471,19 @@ proc updateScores(g: GossipSub) = # avoid async
for peer, stats in g.peerStats.mpairs: for peer, stats in g.peerStats.mpairs:
debug "updating peer score", peer, gossipTopics = peer.topics.len debug "updating peer score", peer, gossipTopics = peer.topics.len
# TODO
if not peer.connected: if not peer.connected:
if now > stats.expire: if now > stats.expire:
discard # really cleanup peer g.peerStats.del(peer)
debug "evicted peer from memory", peer
continue
# Per topic # Per topic
for topic in peer.topics: for topic in peer.topics:
var topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) var topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
var info = stats.topicInfos.getOrDefault(topic) var info = stats.topicInfos.getOrDefault(topic)
# Scoring
var topicScore = 0'f64 var topicScore = 0'f64
if info.inMesh: if info.inMesh:
@ -507,18 +511,35 @@ proc updateScores(g: GossipSub) = # avoid async
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
debug "p3b", peer, p3b = info.meshFailurePenalty debug "p3b", peer, p3b = info.meshFailurePenalty
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
g.topicParams[topic] = topicParams debug "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries
stats.topicInfos[topic] = info
debug "updated peer topic's scores", peer, topic, info debug "updated peer topic's scores", peer, topic, info
peer.score += topicScore * topicParams.topicWeight peer.score += topicScore * topicParams.topicWeight
# debug assert to check nim compiler is doing what we are asking... # Score decay
assert(g.peerStats[peer].topicInfos[topic].meshTime == info.meshTime) info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
if info.firstMessageDeliveries < g.parameters.decayToZero:
info.firstMessageDeliveries = 0
info.meshMessageDeliveries *= topicParams.meshMessageDeliveriesDecay
if info.meshMessageDeliveries < g.parameters.decayToZero:
info.meshMessageDeliveries = 0
info.meshFailurePenalty *= topicParams.meshFailurePenaltyDecay
if info.meshFailurePenalty < g.parameters.decayToZero:
info.meshFailurePenalty = 0
info.invalidMessageDeliveries *= topicParams.invalidMessageDeliveriesDecay
if info.invalidMessageDeliveries < g.parameters.decayToZero:
info.invalidMessageDeliveries = 0
# Wrap up
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
stats.topicInfos[topic] = info
debug "updated peer's score", peer, score = peer.score debug "updated peer's score", peer, score = peer.score
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
@ -739,6 +760,10 @@ method rpcHandler*(g: GossipSub,
if not (await g.validate(msg)): if not (await g.validate(msg)):
trace "dropping message due to failed validation" trace "dropping message due to failed validation"
for t in msg.topicIDs:
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
tstats.invalidMessageDeliveries += 1
g.peerStats[peer].topicInfos[t] = tstats
continue continue
# this shouldn't happen # this shouldn't happen