2020-03-01 12:29:24 +00:00
|
|
|
package pubsub
|
|
|
|
|
|
|
|
import (
|
2020-03-09 11:37:23 +00:00
|
|
|
"context"
|
2020-03-09 12:08:42 +00:00
|
|
|
"fmt"
|
2020-03-27 18:18:38 +00:00
|
|
|
"net"
|
2020-03-07 17:43:01 +00:00
|
|
|
"sync"
|
2020-03-07 13:46:41 +00:00
|
|
|
"time"
|
|
|
|
|
2020-03-09 11:22:57 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
2020-03-01 12:29:24 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
2020-03-09 11:22:57 +00:00
|
|
|
|
2020-03-27 18:18:38 +00:00
|
|
|
manet "github.com/multiformats/go-multiaddr-net"
|
2020-03-01 12:29:24 +00:00
|
|
|
)
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
type peerStats struct {
|
|
|
|
// true if the peer is currently connected
|
|
|
|
connected bool
|
|
|
|
|
|
|
|
// expiration time of the score stats for disconnected peers
|
|
|
|
expire time.Time
|
|
|
|
|
|
|
|
// per topc stats
|
|
|
|
topics map[string]*topicStats
|
|
|
|
|
|
|
|
// IP tracking; store as string for easy processing
|
|
|
|
ips []string
|
|
|
|
}
|
|
|
|
|
|
|
|
type topicStats struct {
|
|
|
|
// true if the peer is in the mesh
|
|
|
|
inMesh bool
|
|
|
|
|
|
|
|
// time when the peer was (last) GRAFTed; valid only when in mesh
|
|
|
|
graftTime time.Time
|
|
|
|
|
|
|
|
// time in mesh (updated during refresh/decay to avoid calling gettimeofday on
|
|
|
|
// every score invocation)
|
|
|
|
meshTime time.Duration
|
|
|
|
|
|
|
|
// first message deliveries
|
|
|
|
firstMessageDeliveries float64
|
|
|
|
|
|
|
|
// mesh message deliveries
|
|
|
|
meshMessageDeliveries float64
|
|
|
|
|
|
|
|
// true if the peer has been enough time in the mesh to activate mess message deliveries
|
|
|
|
meshMessageDeliveriesActive bool
|
|
|
|
|
2020-03-08 22:08:38 +00:00
|
|
|
// sticky mesh rate failure penalty counter
|
|
|
|
meshFailurePenalty float64
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// invalid message counter
|
|
|
|
invalidMessageDeliveries float64
|
|
|
|
}
|
|
|
|
|
2020-03-07 13:46:41 +00:00
|
|
|
type peerScore struct {
|
2020-03-07 17:43:01 +00:00
|
|
|
sync.Mutex
|
|
|
|
|
|
|
|
// the score parameters
|
|
|
|
params *PeerScoreParams
|
|
|
|
|
|
|
|
// per peer stats for score calculation
|
|
|
|
peerStats map[peer.ID]*peerStats
|
|
|
|
|
|
|
|
// IP colocation tracking
|
2020-03-07 19:11:45 +00:00
|
|
|
peerIPs map[string]map[peer.ID]struct{}
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
// message delivery tracking
|
|
|
|
deliveries *messageDeliveries
|
|
|
|
|
|
|
|
msgID MsgIdFunction
|
2020-03-09 11:22:57 +00:00
|
|
|
host host.Host
|
2020-03-17 12:14:56 +00:00
|
|
|
|
|
|
|
// debugging inspection
|
|
|
|
inspect PeerScoreInspectFn
|
|
|
|
inspectPeriod time.Duration
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type messageDeliveries struct {
|
2020-03-09 11:07:37 +00:00
|
|
|
records map[string]*deliveryRecord
|
|
|
|
|
|
|
|
// queue for cleaning up old delivery records
|
|
|
|
head *deliveryEntry
|
|
|
|
tail *deliveryEntry
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type deliveryRecord struct {
|
|
|
|
status int
|
2020-03-08 20:44:45 +00:00
|
|
|
validated time.Time
|
2020-03-08 20:08:53 +00:00
|
|
|
peers map[peer.ID]struct{}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-09 11:07:37 +00:00
|
|
|
type deliveryEntry struct {
|
|
|
|
id string
|
|
|
|
expire time.Time
|
|
|
|
next *deliveryEntry
|
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
// delivery record status
|
|
|
|
const (
|
2020-03-08 19:56:56 +00:00
|
|
|
delivery_unknown = iota // we don't know (yet) if the message is valid
|
|
|
|
delivery_valid // we know the message is valid
|
|
|
|
delivery_invalid // we know the message is invalid
|
|
|
|
delivery_throttled // we can't tell if it is valid because validation throttled
|
2020-03-08 14:48:45 +00:00
|
|
|
)
|
|
|
|
|
2020-03-17 12:14:56 +00:00
|
|
|
type PeerScoreInspectFn func(map[peer.ID]float64)
|
|
|
|
|
2020-03-27 12:10:12 +00:00
|
|
|
// WithPeerScoreInspect is a gossipsub router option that enables peer score debugging.
|
2020-03-17 12:14:56 +00:00
|
|
|
// When this option is enabled, the supplied function will be invoked periodically to allow
|
|
|
|
// the application to inspec or dump the scores for connected peers.
|
|
|
|
// This option must be passed _after_ the WithPeerScore option.
|
2020-03-27 12:10:12 +00:00
|
|
|
func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Option {
|
2020-03-17 12:14:56 +00:00
|
|
|
return func(ps *PubSub) error {
|
|
|
|
gs, ok := ps.rt.(*GossipSubRouter)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("pubsub router is not gossipsub")
|
|
|
|
}
|
|
|
|
|
|
|
|
if gs.score == nil {
|
|
|
|
return fmt.Errorf("peer scoring is not enabled")
|
|
|
|
}
|
|
|
|
|
|
|
|
gs.score.inspect = inspect
|
|
|
|
gs.score.inspectPeriod = period
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// implementation
|
2020-03-07 15:59:22 +00:00
|
|
|
func newPeerScore(params *PeerScoreParams) *peerScore {
|
2020-03-09 11:37:23 +00:00
|
|
|
return &peerScore{
|
|
|
|
params: params,
|
|
|
|
peerStats: make(map[peer.ID]*peerStats),
|
|
|
|
peerIPs: make(map[string]map[peer.ID]struct{}),
|
|
|
|
deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)},
|
|
|
|
msgID: DefaultMsgIdFn,
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// router interface
|
2020-03-07 15:59:22 +00:00
|
|
|
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
2020-03-16 19:27:52 +00:00
|
|
|
if ps == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-09 11:37:23 +00:00
|
|
|
ps.msgID = gs.p.msgID
|
|
|
|
ps.host = gs.p.host
|
|
|
|
go ps.background(gs.p.ctx)
|
2020-03-07 15:59:22 +00:00
|
|
|
}
|
|
|
|
|
2020-03-02 13:05:24 +00:00
|
|
|
func (ps *peerScore) Score(p peer.ID) float64 {
|
2020-03-07 23:54:40 +00:00
|
|
|
if ps == nil {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
2020-03-17 12:14:56 +00:00
|
|
|
return ps.score(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ps *peerScore) score(p peer.ID) float64 {
|
2020-03-07 17:43:01 +00:00
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
var score float64
|
|
|
|
|
|
|
|
// topic scores
|
|
|
|
for topic, tstats := range pstats.topics {
|
|
|
|
// the topic score
|
|
|
|
var topicScore float64
|
|
|
|
|
|
|
|
// the topic parameters
|
|
|
|
topicParams, ok := ps.params.Topics[topic]
|
|
|
|
if !ok {
|
|
|
|
// we are not scoring this topic
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// P1: time in Mesh
|
|
|
|
if tstats.inMesh {
|
|
|
|
p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum)
|
2020-03-08 08:49:19 +00:00
|
|
|
if p1 > topicParams.TimeInMeshCap {
|
|
|
|
p1 = topicParams.TimeInMeshCap
|
|
|
|
}
|
2020-03-07 17:43:01 +00:00
|
|
|
topicScore += p1 * topicParams.TimeInMeshWeight
|
|
|
|
}
|
|
|
|
|
|
|
|
// P2: first message deliveries
|
|
|
|
p2 := tstats.firstMessageDeliveries
|
|
|
|
topicScore += p2 * topicParams.FirstMessageDeliveriesWeight
|
|
|
|
|
|
|
|
// P3: mesh message deliveries
|
|
|
|
if tstats.meshMessageDeliveriesActive {
|
|
|
|
if tstats.meshMessageDeliveries < topicParams.MeshMessageDeliveriesThreshold {
|
|
|
|
deficit := topicParams.MeshMessageDeliveriesThreshold - tstats.meshMessageDeliveries
|
|
|
|
p3 := deficit * deficit
|
|
|
|
topicScore += p3 * topicParams.MeshMessageDeliveriesWeight
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-08 22:08:38 +00:00
|
|
|
// P3b:
|
|
|
|
p3b := tstats.meshFailurePenalty
|
|
|
|
topicScore += p3b * topicParams.MeshFailurePenaltyWeight
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// P4: invalid messages
|
|
|
|
p4 := tstats.invalidMessageDeliveries
|
|
|
|
topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight
|
|
|
|
|
|
|
|
// update score, mixing with topic weight
|
|
|
|
score += topicScore * topicParams.TopicWeight
|
|
|
|
}
|
|
|
|
|
2020-03-27 14:46:41 +00:00
|
|
|
// apply the topic score cap, if any
|
|
|
|
if ps.params.TopicScoreCap > 0 && score > ps.params.TopicScoreCap {
|
|
|
|
score = ps.params.TopicScoreCap
|
|
|
|
}
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// P5: application-specific score
|
|
|
|
p5 := ps.params.AppSpecificScore(p)
|
|
|
|
score += p5 * ps.params.AppSpecificWeight
|
|
|
|
|
|
|
|
// P6: IP collocation factor
|
|
|
|
for _, ip := range pstats.ips {
|
2020-03-27 18:21:42 +00:00
|
|
|
_, whitelisted := ps.params.IPColocationFactorWhitelist[ip]
|
|
|
|
if whitelisted {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
peersInIP := len(ps.peerIPs[ip])
|
|
|
|
if peersInIP > ps.params.IPColocationFactorThreshold {
|
|
|
|
surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold)
|
|
|
|
p6 := surpluss * surpluss
|
|
|
|
score += p6 * ps.params.IPColocationFactorWeight
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return score
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-07 19:09:50 +00:00
|
|
|
// periodic maintenance
|
2020-03-09 11:37:23 +00:00
|
|
|
func (ps *peerScore) background(ctx context.Context) {
|
|
|
|
refreshScores := time.NewTicker(ps.params.DecayInterval)
|
|
|
|
defer refreshScores.Stop()
|
|
|
|
|
|
|
|
refreshIPs := time.NewTicker(time.Minute)
|
|
|
|
defer refreshIPs.Stop()
|
|
|
|
|
|
|
|
gcDeliveryRecords := time.NewTicker(time.Minute)
|
|
|
|
defer gcDeliveryRecords.Stop()
|
|
|
|
|
2020-03-17 12:14:56 +00:00
|
|
|
var inspectScores <-chan time.Time
|
|
|
|
if ps.inspect != nil {
|
|
|
|
ticker := time.NewTicker(ps.inspectPeriod)
|
|
|
|
defer ticker.Stop()
|
2020-03-27 12:10:12 +00:00
|
|
|
// also dump at exit for one final sample
|
|
|
|
defer ps.inspectScores()
|
2020-03-17 12:14:56 +00:00
|
|
|
inspectScores = ticker.C
|
|
|
|
}
|
|
|
|
|
2020-03-09 11:37:23 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-refreshScores.C:
|
|
|
|
ps.refreshScores()
|
|
|
|
|
|
|
|
case <-refreshIPs.C:
|
|
|
|
ps.refreshIPs()
|
|
|
|
|
|
|
|
case <-gcDeliveryRecords.C:
|
|
|
|
ps.gcDeliveryRecords()
|
|
|
|
|
2020-03-17 12:14:56 +00:00
|
|
|
case <-inspectScores:
|
|
|
|
ps.inspectScores()
|
|
|
|
|
2020-03-09 11:37:23 +00:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-17 12:14:56 +00:00
|
|
|
func (ps *peerScore) inspectScores() {
|
|
|
|
ps.Lock()
|
|
|
|
scores := make(map[peer.ID]float64, len(ps.peerStats))
|
|
|
|
for p := range ps.peerStats {
|
|
|
|
scores[p] = ps.score(p)
|
|
|
|
}
|
|
|
|
ps.Unlock()
|
|
|
|
|
|
|
|
ps.inspect(scores)
|
|
|
|
}
|
|
|
|
|
2020-03-07 19:09:50 +00:00
|
|
|
func (ps *peerScore) refreshScores() {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
for p, pstats := range ps.peerStats {
|
|
|
|
if !pstats.connected {
|
|
|
|
// has the retention period expired?
|
|
|
|
if now.After(pstats.expire) {
|
2020-03-08 18:59:49 +00:00
|
|
|
// yes, throw it away (but clean up the IP tracking first)
|
|
|
|
ps.removeIPs(p, pstats.ips)
|
2020-03-07 19:09:50 +00:00
|
|
|
delete(ps.peerStats, p)
|
|
|
|
}
|
|
|
|
|
|
|
|
// we don't decay retained scores, as the peer is not active.
|
|
|
|
// this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
|
|
|
|
// unless the retention period has ellapsed.
|
|
|
|
// similarly, a well behaved peer does not lose its score by getting disconnected.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for topic, tstats := range pstats.topics {
|
|
|
|
// the topic parameters
|
|
|
|
topicParams, ok := ps.params.Topics[topic]
|
|
|
|
if !ok {
|
|
|
|
// we are not scoring this topic
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// decay counters
|
|
|
|
tstats.firstMessageDeliveries *= topicParams.FirstMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.firstMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.firstMessageDeliveries = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
tstats.meshMessageDeliveries *= topicParams.MeshMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.meshMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.meshMessageDeliveries = 0
|
|
|
|
}
|
2020-03-08 22:08:38 +00:00
|
|
|
tstats.meshFailurePenalty *= topicParams.MeshFailurePenaltyDecay
|
|
|
|
if tstats.meshFailurePenalty < ps.params.DecayToZero {
|
|
|
|
tstats.meshFailurePenalty = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
tstats.invalidMessageDeliveries *= topicParams.InvalidMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.invalidMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.invalidMessageDeliveries = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
// update mesh time and activate mesh message delivery parameter if need be
|
|
|
|
if tstats.inMesh {
|
|
|
|
tstats.meshTime = now.Sub(tstats.graftTime)
|
|
|
|
if tstats.meshTime > topicParams.MeshMessageDeliveriesActivation {
|
|
|
|
tstats.meshMessageDeliveriesActive = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-09 11:37:23 +00:00
|
|
|
func (ps *peerScore) refreshIPs() {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
2020-03-07 19:09:50 +00:00
|
|
|
// peer IPs may change, so we periodically refresh them
|
2020-03-09 11:22:57 +00:00
|
|
|
for p, pstats := range ps.peerStats {
|
|
|
|
if pstats.connected {
|
|
|
|
ips := ps.getIPs(p)
|
|
|
|
ps.setIPs(p, ips, pstats.ips)
|
|
|
|
pstats.ips = ips
|
|
|
|
}
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
}
|
|
|
|
|
2020-03-09 11:37:23 +00:00
|
|
|
func (ps *peerScore) gcDeliveryRecords() {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
ps.deliveries.gc()
|
|
|
|
}
|
|
|
|
|
2020-03-01 12:29:24 +00:00
|
|
|
// tracer interface
|
|
|
|
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
|
2020-03-08 18:59:49 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
pstats = &peerStats{topics: make(map[string]*topicStats)}
|
|
|
|
ps.peerStats[p] = pstats
|
|
|
|
}
|
|
|
|
|
|
|
|
pstats.connected = true
|
|
|
|
ips := ps.getIPs(p)
|
2020-03-09 11:22:57 +00:00
|
|
|
ps.setIPs(p, ips, pstats.ips)
|
2020-03-08 18:59:49 +00:00
|
|
|
pstats.ips = ips
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ps *peerScore) RemovePeer(p peer.ID) {
|
2020-03-08 18:59:49 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-27 15:54:59 +00:00
|
|
|
// decide whether to retain the score; this currently only retains non-positive scores
|
|
|
|
// to dissuade attacks on the score function.
|
|
|
|
if ps.score(p) > 0 {
|
|
|
|
ps.removeIPs(p, pstats.ips)
|
|
|
|
delete(ps.peerStats, p)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// furthermore, when we decide to retain the score, the firstMessageDelivery counters are
|
|
|
|
// reset to 0 and mesh delivery penalties applied.
|
|
|
|
for topic, tstats := range pstats.topics {
|
|
|
|
tstats.firstMessageDeliveries = 0
|
|
|
|
|
|
|
|
threshold := ps.params.Topics[topic].MeshMessageDeliveriesThreshold
|
|
|
|
if tstats.inMesh && tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold {
|
|
|
|
deficit := threshold - tstats.meshMessageDeliveries
|
|
|
|
tstats.meshFailurePenalty += deficit * deficit
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats.inMesh = false
|
|
|
|
}
|
|
|
|
|
2020-03-08 18:59:49 +00:00
|
|
|
pstats.connected = false
|
|
|
|
pstats.expire = time.Now().Add(ps.params.RetainScore)
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) Join(topic string) {}
|
|
|
|
func (ps *peerScore) Leave(topic string) {}
|
|
|
|
|
|
|
|
func (ps *peerScore) Graft(p peer.ID, topic string) {
|
2020-03-08 18:21:16 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats.inMesh = true
|
|
|
|
tstats.graftTime = time.Now()
|
|
|
|
tstats.meshTime = 0
|
2020-03-08 22:08:38 +00:00
|
|
|
tstats.meshMessageDeliveriesActive = false
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) Prune(p peer.ID, topic string) {
|
2020-03-08 18:21:16 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-08 23:17:11 +00:00
|
|
|
// sticky mesh delivery rate failure penalty
|
|
|
|
threshold := ps.params.Topics[topic].MeshMessageDeliveriesThreshold
|
|
|
|
if tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold {
|
|
|
|
deficit := threshold - tstats.meshMessageDeliveries
|
|
|
|
tstats.meshFailurePenalty += deficit * deficit
|
2020-03-08 22:08:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 18:21:16 +00:00
|
|
|
tstats.inMesh = false
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 20:11:26 +00:00
|
|
|
func (ps *peerScore) ValidateMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
// the pubsub subsystem is beginning validation; create a record to track time in
|
|
|
|
// the validation pipeline with an accurate firstSeen time.
|
|
|
|
_ = ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) DeliverMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
ps.markFirstMessageDelivery(msg.ReceivedFrom, msg)
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
|
2020-03-08 20:08:53 +00:00
|
|
|
// mark the message as valid and reward mesh peers that have already forwarded it to us
|
2020-03-08 14:48:45 +00:00
|
|
|
drec.status = delivery_valid
|
2020-03-08 20:44:45 +00:00
|
|
|
drec.validated = time.Now()
|
2020-03-08 20:08:53 +00:00
|
|
|
for p := range drec.peers {
|
2020-03-09 09:16:50 +00:00
|
|
|
// this check is to make sure a peer can't send us a message twice and get a double count
|
|
|
|
// if it is a first delivery.
|
|
|
|
if p != msg.ReceivedFrom {
|
|
|
|
ps.markDuplicateMessageDelivery(p, msg, time.Time{})
|
|
|
|
}
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
2020-03-08 19:43:28 +00:00
|
|
|
switch reason {
|
|
|
|
// we don't track those messages, but we penalize the peer as they are clearly invalid
|
2020-03-27 18:31:31 +00:00
|
|
|
case rejectMissingSignature:
|
2020-03-08 19:43:28 +00:00
|
|
|
fallthrough
|
2020-03-27 18:31:31 +00:00
|
|
|
case rejectInvalidSignature:
|
2020-03-08 14:48:45 +00:00
|
|
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
2020-03-08 19:43:28 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
// we ignore those messages, so do nothing.
|
2020-03-27 18:31:31 +00:00
|
|
|
case rejectBlacklstedPeer:
|
2020-03-08 19:43:28 +00:00
|
|
|
fallthrough
|
2020-03-27 18:31:31 +00:00
|
|
|
case rejectBlacklistedSource:
|
2020-03-08 14:48:45 +00:00
|
|
|
return
|
2020-03-16 16:24:37 +00:00
|
|
|
|
2020-03-27 18:31:31 +00:00
|
|
|
case rejectValidationQueueFull:
|
2020-03-16 16:24:37 +00:00
|
|
|
// the message was rejected before it entered the validation pipeline;
|
|
|
|
// we don't know if this message has a valid signature, and thus we also don't know if
|
|
|
|
// it has a valid message ID; all we can do is ignore it.
|
|
|
|
return
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-27 18:31:31 +00:00
|
|
|
if reason == rejectValidationThrottled {
|
2020-03-08 14:48:45 +00:00
|
|
|
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
|
|
|
|
// because we don't know if it was valid.
|
|
|
|
drec.status = delivery_throttled
|
|
|
|
// release the delivery time tracking map to free some memory early
|
|
|
|
drec.peers = nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// mark the message as invalid and penalize peers that have already forwarded it.
|
|
|
|
drec.status = delivery_invalid
|
|
|
|
for p := range drec.peers {
|
|
|
|
ps.markInvalidMessageDelivery(p, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// release the delivery time tracking map to free some memory early
|
|
|
|
drec.peers = nil
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) DuplicateMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
|
|
|
|
_, ok := drec.peers[msg.ReceivedFrom]
|
|
|
|
if ok {
|
|
|
|
// we have already seen this duplicate!
|
|
|
|
return
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
switch drec.status {
|
|
|
|
case delivery_unknown:
|
2020-03-08 20:08:53 +00:00
|
|
|
// the message is being validated; track the peer delivery and wait for
|
2020-03-08 14:48:45 +00:00
|
|
|
// the Deliver/Reject notification.
|
2020-03-08 20:08:53 +00:00
|
|
|
drec.peers[msg.ReceivedFrom] = struct{}{}
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
case delivery_valid:
|
|
|
|
// mark the peer delivery time to only count a duplicate delivery once.
|
2020-03-08 20:08:53 +00:00
|
|
|
drec.peers[msg.ReceivedFrom] = struct{}{}
|
2020-03-08 20:44:45 +00:00
|
|
|
ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.validated)
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
case delivery_invalid:
|
|
|
|
// we no longer track delivery time
|
|
|
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
|
|
|
|
|
|
|
case delivery_throttled:
|
2020-03-09 09:16:50 +00:00
|
|
|
// the message was throttled; do nothing (we don't know if it was valid)
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
// message delivery records
|
|
|
|
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
2020-03-09 11:07:37 +00:00
|
|
|
rec, ok := d.records[id]
|
|
|
|
if ok {
|
|
|
|
return rec
|
|
|
|
}
|
|
|
|
|
|
|
|
rec = &deliveryRecord{peers: make(map[peer.ID]struct{})}
|
|
|
|
d.records[id] = rec
|
|
|
|
|
|
|
|
entry := &deliveryEntry{id: id, expire: time.Now().Add(TimeCacheDuration)}
|
|
|
|
if d.tail != nil {
|
|
|
|
d.tail.next = entry
|
|
|
|
d.tail = entry
|
|
|
|
} else {
|
|
|
|
d.head = entry
|
|
|
|
d.tail = entry
|
|
|
|
}
|
|
|
|
|
|
|
|
return rec
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *messageDeliveries) gc() {
|
|
|
|
if d.head == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
for d.head != nil && now.After(d.head.expire) {
|
|
|
|
delete(d.records, d.head.id)
|
|
|
|
d.head = d.head.next
|
|
|
|
}
|
|
|
|
|
|
|
|
if d.head == nil {
|
|
|
|
d.tail = nil
|
|
|
|
}
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// utilities
|
|
|
|
func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) {
|
|
|
|
tstats, ok := pstats.topics[topic]
|
|
|
|
if ok {
|
|
|
|
return tstats, true
|
|
|
|
}
|
|
|
|
|
|
|
|
_, scoredTopic := params.Topics[topic]
|
|
|
|
if !scoredTopic {
|
|
|
|
return nil, false
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats = &topicStats{}
|
|
|
|
pstats.topics[topic] = tstats
|
|
|
|
|
|
|
|
return tstats, true
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.invalidMessageDeliveries += 1
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.firstMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.firstMessageDeliveries > cap {
|
|
|
|
tstats.firstMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if !tstats.inMesh {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.meshMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.meshMessageDeliveries > cap {
|
|
|
|
tstats.meshMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
|
2020-03-08 20:08:53 +00:00
|
|
|
var now time.Time
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
if !validated.IsZero() {
|
2020-03-08 20:08:53 +00:00
|
|
|
now = time.Now()
|
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if !tstats.inMesh {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
// check against the mesh delivery window -- if the validated time is passed as 0, then
|
|
|
|
// the message was received before we finished validation and thus falls within the mesh
|
|
|
|
// delivery window.
|
|
|
|
if !validated.IsZero() && now.After(validated.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) {
|
2020-03-08 14:48:45 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap := ps.params.Topics[topic].MeshMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.meshMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.meshMessageDeliveries > cap {
|
|
|
|
tstats.meshMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
2020-03-08 18:59:49 +00:00
|
|
|
|
|
|
|
// gets the current IPs for a peer
|
|
|
|
func (ps *peerScore) getIPs(p peer.ID) []string {
|
2020-03-09 11:22:57 +00:00
|
|
|
// in unit tests this can be nil
|
|
|
|
if ps.host == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
conns := ps.host.Network().ConnsToPeer(p)
|
|
|
|
res := make([]string, 0, len(conns))
|
|
|
|
for _, c := range conns {
|
|
|
|
remote := c.RemoteMultiaddr()
|
2020-03-27 18:18:38 +00:00
|
|
|
ip, err := manet.ToIP(remote)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-09 11:22:57 +00:00
|
|
|
|
2020-03-27 18:18:38 +00:00
|
|
|
if len(ip) == 4 {
|
|
|
|
// IPv4 address
|
|
|
|
ip4 := ip.String()
|
2020-03-09 11:22:57 +00:00
|
|
|
res = append(res, ip4)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-27 18:18:38 +00:00
|
|
|
if len(ip) == 16 {
|
|
|
|
// IPv6 address -- we add both the actual address and the /64 subnet
|
|
|
|
ip6 := ip.String()
|
2020-03-09 11:22:57 +00:00
|
|
|
res = append(res, ip6)
|
2020-03-27 18:18:38 +00:00
|
|
|
|
|
|
|
ip6mask := ip.Mask(net.CIDRMask(64, 128)).String()
|
|
|
|
res = append(res, ip6mask)
|
2020-03-09 11:22:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res
|
2020-03-08 18:59:49 +00:00
|
|
|
}
|
|
|
|
|
2020-03-09 11:22:57 +00:00
|
|
|
// adds tracking for the new IPs in the list, and removes tracking from the obsolete ips.
|
|
|
|
func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) {
|
2020-03-08 18:59:49 +00:00
|
|
|
addNewIPs:
|
|
|
|
// add the new IPs to the tracking
|
|
|
|
for _, ip := range newips {
|
|
|
|
// check if it is in the old ips list
|
|
|
|
for _, xip := range oldips {
|
|
|
|
if ip == xip {
|
|
|
|
continue addNewIPs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// no, it's a new one -- add it to the tracker
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
peers = make(map[peer.ID]struct{})
|
|
|
|
ps.peerIPs[ip] = peers
|
|
|
|
}
|
|
|
|
peers[p] = struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
removeOldIPs:
|
|
|
|
// remove the obsolete old IPs from the tracking
|
|
|
|
for _, ip := range oldips {
|
|
|
|
// check if it is in the new ips list
|
|
|
|
for _, xip := range newips {
|
|
|
|
if ip == xip {
|
|
|
|
continue removeOldIPs
|
|
|
|
}
|
|
|
|
// no, it's obsolete -- remove it from the tracker
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
delete(peers, p)
|
|
|
|
if len(peers) == 0 {
|
|
|
|
delete(ps.peerIPs, ip)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// removes an IP list from the tracking list
|
|
|
|
func (ps *peerScore) removeIPs(p peer.ID, ips []string) {
|
|
|
|
for _, ip := range ips {
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
delete(peers, p)
|
|
|
|
if len(peers) == 0 {
|
|
|
|
delete(ps.peerIPs, ip)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|