go-libp2p-pubsub/score.go

296 lines
8.0 KiB
Go

package pubsub
import (
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type PeerScoreParams struct {
// Score parameters per topic.
Topics map[string]*TopicScoreParams
// P5: Application-specific peer scoring
AppSpecificScore func(p peer.ID) float64
AppSpecificWeight float64
// P6: IP-colocation factor.
// The parameter has an associated counter which counts the number of peers with the same IP.
// If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
// is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
// If the number of peers in the same IP is less than the threshold, then the value is 0.
// The weight of the parameter MUST be negative, unless you want to disable for testing.
// Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
// thus disabling the IP colocation penalty.
IPColocationFactorWeight float64
IPColocationFactorThreshold int
// the decay interval for parameter counters.
DecayInterval time.Duration
// counter value below which it is considered 0.
DecayToZero float64
// time to remember counters for a disconnected peer.
RetainScore time.Duration
}
type TopicScoreParams struct {
// The weight of the topic.
TopicWeight float64
// P1: time in the mesh
// This is the time the peer has ben grafted in the mesh.
// The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap
// The weight of the parameter MUST be positive.
TimeInMeshWeight float64
TimeInMeshQuantum time.Duration
TimeInMeshCap float64
// P2: first message deliveries
// This is the number of message deliveries in the topic.
// The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
// by FirstMessageDeliveriesCap.
// The weight of the parameter MUST be positive.
FirstMessageDeliveriesWeight, FirstMessageDeliveriesDecay float64
FirstMessageDeliveriesCap float64
// P3: mesh message deliveries
// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesLatency of
// the first message delivery.
// The parameter has an associated counter, decaying with MessageMessageDeliveriesDecay.
// If the counter exceeds the threshold, its value is 0.
// If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
// the deficit, ie (MessageDeliveriesThreshold - counter)^2
// The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
// The weight of the parameter MUST be negative.
MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64
MeshMessageDeliveriesThreshold float64
MeshMessageDeliveriesLatency, MeshMessageDeliveriesActivation time.Duration
// P4: invalid messages
// This is the number of invalid messages in the topic.
// The value of the parameter is a counter, decaying with InvalidMessageDeliveriesDecay.
// The weight of the parameter MUST be negative.
InvalidMessageDeliveriesWeight, InvalidMessageDeliveriesDecay float64
}
type peerStats struct {
// true if the peer is currently connected
connected bool
// expiration time of the score stats for disconnected peers
expire time.Time
// per topc stats
topics map[string]*topicStats
// IP tracking; store as string for easy processing
ips []string
}
type topicStats struct {
// true if the peer is in the mesh
inMesh bool
// time when the peer was (last) GRAFTed; valid only when in mesh
graftTime time.Time
// time in mesh (updated during refresh/decay to avoid calling gettimeofday on
// every score invocation)
meshTime time.Duration
// first message deliveries
firstMessageDeliveries float64
// mesh message deliveries
meshMessageDeliveries float64
// true if the peer has been enough time in the mesh to activate mess message deliveries
meshMessageDeliveriesActive bool
// invalid message counter
invalidMessageDeliveries float64
}
type peerScore struct {
sync.Mutex
// the score parameters
params *PeerScoreParams
// per peer stats for score calculation
peerStats map[peer.ID]*peerStats
// IP colocation tracking
peerIPs map[string]map[peer.ID]struct{}
}
func newPeerScore(params *PeerScoreParams) *peerScore {
return nil
}
// router interface
func (ps *peerScore) Start(gs *GossipSubRouter) {
}
func (ps *peerScore) Score(p peer.ID) float64 {
if ps == nil {
return 0
}
ps.Lock()
defer ps.Unlock()
pstats, ok := ps.peerStats[p]
if !ok {
return 0
}
var score float64
// topic scores
for topic, tstats := range pstats.topics {
// the topic score
var topicScore float64
// the topic parameters
topicParams, ok := ps.params.Topics[topic]
if !ok {
// we are not scoring this topic
continue
}
// P1: time in Mesh
if tstats.inMesh {
p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum)
topicScore += p1 * topicParams.TimeInMeshWeight
}
// P2: first message deliveries
p2 := tstats.firstMessageDeliveries
topicScore += p2 * topicParams.FirstMessageDeliveriesWeight
// P3: mesh message deliveries
if tstats.meshMessageDeliveriesActive {
if tstats.meshMessageDeliveries < topicParams.MeshMessageDeliveriesThreshold {
deficit := topicParams.MeshMessageDeliveriesThreshold - tstats.meshMessageDeliveries
p3 := deficit * deficit
topicScore += p3 * topicParams.MeshMessageDeliveriesWeight
}
}
// P4: invalid messages
p4 := tstats.invalidMessageDeliveries
topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight
// update score, mixing with topic weight
score += topicScore * topicParams.TopicWeight
}
// P5: application-specific score
p5 := ps.params.AppSpecificScore(p)
score += p5 * ps.params.AppSpecificWeight
// P6: IP collocation factor
for _, ip := range pstats.ips {
peersInIP := len(ps.peerIPs[ip])
if peersInIP > ps.params.IPColocationFactorThreshold {
surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold)
p6 := surpluss * surpluss
score += p6 * ps.params.IPColocationFactorWeight
}
}
return score
}
// periodic maintenance
func (ps *peerScore) refreshScores() {
ps.Lock()
defer ps.Unlock()
now := time.Now()
for p, pstats := range ps.peerStats {
if !pstats.connected {
// has the retention period expired?
if now.After(pstats.expire) {
// yes, throw it away
delete(ps.peerStats, p)
}
// we don't decay retained scores, as the peer is not active.
// this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
// unless the retention period has ellapsed.
// similarly, a well behaved peer does not lose its score by getting disconnected.
continue
}
for topic, tstats := range pstats.topics {
// the topic parameters
topicParams, ok := ps.params.Topics[topic]
if !ok {
// we are not scoring this topic
continue
}
// decay counters
tstats.firstMessageDeliveries *= topicParams.FirstMessageDeliveriesDecay
tstats.meshMessageDeliveries *= topicParams.MeshMessageDeliveriesDecay
tstats.invalidMessageDeliveries *= topicParams.InvalidMessageDeliveriesDecay
// update mesh time and activate mesh message delivery parameter if need be
if tstats.inMesh {
tstats.meshTime = now.Sub(tstats.graftTime)
if tstats.meshTime > topicParams.MeshMessageDeliveriesActivation {
tstats.meshMessageDeliveriesActive = true
}
}
}
}
}
func (ps *peerScore) resfreshIPs() {
// peer IPs may change, so we periodically refresh them
}
// tracer interface
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
}
func (ps *peerScore) RemovePeer(p peer.ID) {
}
func (ps *peerScore) Join(topic string) {
}
func (ps *peerScore) Leave(topic string) {
}
func (ps *peerScore) Graft(p peer.ID, topic string) {
}
func (ps *peerScore) Prune(p peer.ID, topic string) {
}
func (ps *peerScore) DeliverMessage(msg *Message) {
}
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
}
func (ps *peerScore) DuplicateMessage(msg *Message) {
}