diff --git a/score.go b/score.go index 9ecfa54..393b982 100644 --- a/score.go +++ b/score.go @@ -1,6 +1,7 @@ package pubsub import ( + "sync" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -11,6 +12,10 @@ type PeerScoreParams struct { // Score parameters per topic. Topics map[string]*TopicScoreParams + // P5: Application-specific peer scoring + AppSpecificScore func(p peer.ID) float64 + AppSpecificWeight float64 + // P6: IP-colocation factor. // The parameter has an associated counter which counts the number of peers with the same IP. // If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value @@ -22,10 +27,6 @@ type PeerScoreParams struct { IPColocationFactorWeight float64 IPColocationFactorThreshold int - // Application-specific peer scoring - AppSpecificScore func(p peer.ID) float64 - AppSpecificWeight float64 - // the decay interval for parameter counters. DecayInterval time.Duration @@ -46,7 +47,7 @@ type TopicScoreParams struct { // The weight of the parameter MUST be positive. TimeInMeshWeight float64 TimeInMeshQuantum time.Duration - TimeInMeshCap int + TimeInMeshCap float64 // P2: first message deliveries // This is the number of message deliveries in the topic. @@ -54,7 +55,7 @@ type TopicScoreParams struct { // by FirstMessageDeliveriesCap. // The weight of the parameter MUST be positive. FirstMessageDeliveriesWeight, FirstMessageDeliveriesDecay float64 - FirstMessageDeliveriesCap int + FirstMessageDeliveriesCap float64 // P3: mesh message deliveries // This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesLatency of @@ -66,7 +67,7 @@ type TopicScoreParams struct { // The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh. // The weight of the parameter MUST be negative. MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64 - MeshMessageDeliveriesThreshold int + MeshMessageDeliveriesThreshold float64 MeshMessageDeliveriesLatency, MeshMessageDeliveriesActivation time.Duration // P4: invalid messages @@ -76,7 +77,55 @@ type TopicScoreParams struct { InvalidMessageDeliveriesWeight, InvalidMessageDeliveriesDecay float64 } +type peerStats struct { + // true if the peer is currently connected + connected bool + + // expiration time of the score stats for disconnected peers + expire time.Time + + // per topc stats + topics map[string]*topicStats + + // IP tracking; store as string for easy processing + ips []string +} + +type topicStats struct { + // true if the peer is in the mesh + inMesh bool + + // time when the peer was (last) GRAFTed; valid only when in mesh + graftTime time.Time + + // time in mesh (updated during refresh/decay to avoid calling gettimeofday on + // every score invocation) + meshTime time.Duration + + // first message deliveries + firstMessageDeliveries float64 + + // mesh message deliveries + meshMessageDeliveries float64 + + // true if the peer has been enough time in the mesh to activate mess message deliveries + meshMessageDeliveriesActive bool + + // invalid message counter + invalidMessageDeliveries float64 +} + type peerScore struct { + sync.Mutex + + // the score parameters + params *PeerScoreParams + + // per peer stats for score calculation + peerStats map[peer.ID]*peerStats + + // IP colocation tracking + peerIPs map[string]peer.ID } func newPeerScore(params *PeerScoreParams) *peerScore { @@ -89,7 +138,70 @@ func (ps *peerScore) Start(gs *GossipSubRouter) { } func (ps *peerScore) Score(p peer.ID) float64 { - return 0 + ps.Lock() + defer ps.Unlock() + + pstats, ok := ps.peerStats[p] + if !ok { + return 0 + } + + var score float64 + + // topic scores + for topic, tstats := range pstats.topics { + // the topic score + var topicScore float64 + + // the topic parameters + topicParams, ok := ps.params.Topics[topic] + if !ok { + // we are not scoring this topic + continue + } + + // P1: time in Mesh + if tstats.inMesh { + p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum) + topicScore += p1 * topicParams.TimeInMeshWeight + } + + // P2: first message deliveries + p2 := tstats.firstMessageDeliveries + topicScore += p2 * topicParams.FirstMessageDeliveriesWeight + + // P3: mesh message deliveries + if tstats.meshMessageDeliveriesActive { + if tstats.meshMessageDeliveries < topicParams.MeshMessageDeliveriesThreshold { + deficit := topicParams.MeshMessageDeliveriesThreshold - tstats.meshMessageDeliveries + p3 := deficit * deficit + topicScore += p3 * topicParams.MeshMessageDeliveriesWeight + } + } + + // P4: invalid messages + p4 := tstats.invalidMessageDeliveries + topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight + + // update score, mixing with topic weight + score += topicScore * topicParams.TopicWeight + } + + // P5: application-specific score + p5 := ps.params.AppSpecificScore(p) + score += p5 * ps.params.AppSpecificWeight + + // P6: IP collocation factor + for _, ip := range pstats.ips { + peersInIP := len(ps.peerIPs[ip]) + if peersInIP > ps.params.IPColocationFactorThreshold { + surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold) + p6 := surpluss * surpluss + score += p6 * ps.params.IPColocationFactorWeight + } + } + + return score } // tracer interface