2020-03-01 12:29:24 +00:00
package pubsub
import (
2020-03-09 11:37:23 +00:00
"context"
2020-03-09 12:08:42 +00:00
"fmt"
2020-03-27 18:18:38 +00:00
"net"
2020-03-07 17:43:01 +00:00
"sync"
2020-03-07 13:46:41 +00:00
"time"
2020-03-09 11:22:57 +00:00
"github.com/libp2p/go-libp2p-core/host"
2020-03-01 12:29:24 +00:00
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
2020-03-09 11:22:57 +00:00
2020-09-02 22:32:05 +00:00
manet "github.com/multiformats/go-multiaddr/net"
2020-03-01 12:29:24 +00:00
)
2020-03-07 17:43:01 +00:00
type peerStats struct {
// true if the peer is currently connected
connected bool
// expiration time of the score stats for disconnected peers
expire time . Time
// per topc stats
topics map [ string ] * topicStats
// IP tracking; store as string for easy processing
ips [ ] string
2020-05-01 18:38:32 +00:00
2021-02-12 18:16:19 +00:00
// IP whitelisting cache
ipWhitelist map [ string ] bool
2020-05-01 18:38:32 +00:00
// behavioural pattern penalties (applied by the router)
behaviourPenalty float64
2020-03-07 17:43:01 +00:00
}
type topicStats struct {
// true if the peer is in the mesh
inMesh bool
// time when the peer was (last) GRAFTed; valid only when in mesh
graftTime time . Time
// time in mesh (updated during refresh/decay to avoid calling gettimeofday on
// every score invocation)
meshTime time . Duration
// first message deliveries
firstMessageDeliveries float64
// mesh message deliveries
meshMessageDeliveries float64
// true if the peer has been enough time in the mesh to activate mess message deliveries
meshMessageDeliveriesActive bool
2020-03-08 22:08:38 +00:00
// sticky mesh rate failure penalty counter
meshFailurePenalty float64
2020-03-07 17:43:01 +00:00
// invalid message counter
invalidMessageDeliveries float64
}
2020-03-07 13:46:41 +00:00
type peerScore struct {
2020-03-07 17:43:01 +00:00
sync . Mutex
// the score parameters
params * PeerScoreParams
// per peer stats for score calculation
peerStats map [ peer . ID ] * peerStats
2020-04-01 22:20:53 +00:00
// IP colocation tracking; maps IP => set of peers.
2020-03-07 19:11:45 +00:00
peerIPs map [ string ] map [ peer . ID ] struct { }
2020-03-08 14:48:45 +00:00
// message delivery tracking
deliveries * messageDeliveries
msgID MsgIdFunction
2020-03-09 11:22:57 +00:00
host host . Host
2020-03-17 12:14:56 +00:00
// debugging inspection
inspect PeerScoreInspectFn
2020-07-22 18:26:44 +00:00
inspectEx ExtendedPeerScoreInspectFn
2020-03-17 12:14:56 +00:00
inspectPeriod time . Duration
2020-03-08 14:48:45 +00:00
}
2020-05-07 17:58:44 +00:00
var _ internalTracer = ( * peerScore ) ( nil )
2020-04-01 22:20:53 +00:00
2020-03-08 14:48:45 +00:00
type messageDeliveries struct {
2020-03-09 11:07:37 +00:00
records map [ string ] * deliveryRecord
// queue for cleaning up old delivery records
head * deliveryEntry
tail * deliveryEntry
2020-03-08 14:48:45 +00:00
}
type deliveryRecord struct {
status int
2020-05-04 06:56:51 +00:00
firstSeen time . Time
2020-03-08 20:44:45 +00:00
validated time . Time
2020-03-08 20:08:53 +00:00
peers map [ peer . ID ] struct { }
2020-03-01 12:29:24 +00:00
}
2020-03-09 11:07:37 +00:00
type deliveryEntry struct {
id string
expire time . Time
next * deliveryEntry
}
2020-03-08 14:48:45 +00:00
// delivery record status
const (
2020-03-27 18:34:00 +00:00
deliveryUnknown = iota // we don't know (yet) if the message is valid
deliveryValid // we know the message is valid
deliveryInvalid // we know the message is invalid
2020-04-28 14:40:36 +00:00
deliveryIgnored // we were intructed by the validator to ignore the message
2020-03-27 18:34:00 +00:00
deliveryThrottled // we can't tell if it is valid because validation throttled
2020-03-08 14:48:45 +00:00
)
2020-07-29 17:04:52 +00:00
type PeerScoreInspectFn = func ( map [ peer . ID ] float64 )
type ExtendedPeerScoreInspectFn = func ( map [ peer . ID ] * PeerScoreSnapshot )
2020-07-22 18:26:44 +00:00
type PeerScoreSnapshot struct {
Score float64
Topics map [ string ] * TopicScoreSnapshot
AppSpecificScore float64
2020-07-29 17:18:55 +00:00
IPColocationFactor float64
2020-07-22 18:26:44 +00:00
BehaviourPenalty float64
}
type TopicScoreSnapshot struct {
TimeInMesh time . Duration
FirstMessageDeliveries float64
MeshMessageDeliveries float64
InvalidMessageDeliveries float64
}
2020-03-17 12:14:56 +00:00
2020-03-27 12:10:12 +00:00
// WithPeerScoreInspect is a gossipsub router option that enables peer score debugging.
2020-03-17 12:14:56 +00:00
// When this option is enabled, the supplied function will be invoked periodically to allow
2020-07-22 18:26:44 +00:00
// the application to inspect or dump the scores for connected peers.
// The supplied function can have one of two signatures:
// - PeerScoreInspectFn, which takes a map of peer IDs to score.
// - ExtendedPeerScoreInspectFn, which takes a map of peer IDs to
// PeerScoreSnapshots and allows inspection of individual score
// components for debugging peer scoring.
2020-03-17 12:14:56 +00:00
// This option must be passed _after_ the WithPeerScore option.
2020-07-22 18:26:44 +00:00
func WithPeerScoreInspect ( inspect interface { } , period time . Duration ) Option {
2020-03-17 12:14:56 +00:00
return func ( ps * PubSub ) error {
2021-03-24 13:42:37 +00:00
gs , ok := ps . rt . ( * WakuRelaySubRouter )
2020-03-17 12:14:56 +00:00
if ! ok {
2021-03-24 13:42:37 +00:00
return fmt . Errorf ( "pubsub router is not wakurelaysub" )
2020-03-17 12:14:56 +00:00
}
if gs . score == nil {
return fmt . Errorf ( "peer scoring is not enabled" )
}
2020-07-29 17:09:06 +00:00
if gs . score . inspect != nil || gs . score . inspectEx != nil {
return fmt . Errorf ( "duplicate peer score inspector" )
}
2020-07-22 18:26:44 +00:00
switch i := inspect . ( type ) {
case PeerScoreInspectFn :
gs . score . inspect = i
case ExtendedPeerScoreInspectFn :
gs . score . inspectEx = i
default :
return fmt . Errorf ( "unknown peer score insector type: %v" , inspect )
}
2020-03-17 12:14:56 +00:00
gs . score . inspectPeriod = period
return nil
}
}
// implementation
2020-03-07 15:59:22 +00:00
func newPeerScore ( params * PeerScoreParams ) * peerScore {
2020-03-09 11:37:23 +00:00
return & peerScore {
params : params ,
peerStats : make ( map [ peer . ID ] * peerStats ) ,
peerIPs : make ( map [ string ] map [ peer . ID ] struct { } ) ,
deliveries : & messageDeliveries { records : make ( map [ string ] * deliveryRecord ) } ,
msgID : DefaultMsgIdFn ,
}
2020-03-01 12:29:24 +00:00
}
2020-09-09 12:15:41 +00:00
// SetTopicScoreParams sets new score parameters for a topic.
// If the topic previously had parameters and the parameters are lowering delivery caps,
// then the score counters are recapped appropriately.
// Note: assumes that the topic score parameters have already been validated
2020-08-12 09:29:49 +00:00
func ( ps * peerScore ) SetTopicScoreParams ( topic string , p * TopicScoreParams ) error {
ps . Lock ( )
defer ps . Unlock ( )
old , exist := ps . params . Topics [ topic ]
ps . params . Topics [ topic ] = p
if ! exist {
return nil
}
// check to see if the counter Caps are being lowered; if that's the case we need to recap them
recap := false
if p . FirstMessageDeliveriesCap < old . FirstMessageDeliveriesCap {
recap = true
}
if p . MeshMessageDeliveriesCap < old . MeshMessageDeliveriesCap {
recap = true
}
if ! recap {
return nil
}
// recap counters for topic
for _ , pstats := range ps . peerStats {
tstats , ok := pstats . topics [ topic ]
if ! ok {
continue
}
if tstats . firstMessageDeliveries > p . FirstMessageDeliveriesCap {
tstats . firstMessageDeliveries = p . FirstMessageDeliveriesCap
}
if tstats . meshMessageDeliveries > p . MeshMessageDeliveriesCap {
tstats . meshMessageDeliveries = p . MeshMessageDeliveriesCap
}
}
return nil
}
2020-03-01 12:29:24 +00:00
// router interface
2021-03-24 13:42:37 +00:00
func ( ps * peerScore ) Start ( gs * WakuRelaySubRouter ) {
2020-03-16 19:27:52 +00:00
if ps == nil {
return
}
2020-03-09 11:37:23 +00:00
ps . msgID = gs . p . msgID
ps . host = gs . p . host
go ps . background ( gs . p . ctx )
2020-03-07 15:59:22 +00:00
}
2020-03-02 13:05:24 +00:00
func ( ps * peerScore ) Score ( p peer . ID ) float64 {
2020-03-07 23:54:40 +00:00
if ps == nil {
return 0
}
2020-03-07 17:43:01 +00:00
ps . Lock ( )
defer ps . Unlock ( )
2020-03-17 12:14:56 +00:00
return ps . score ( p )
}
func ( ps * peerScore ) score ( p peer . ID ) float64 {
2020-03-07 17:43:01 +00:00
pstats , ok := ps . peerStats [ p ]
if ! ok {
return 0
}
var score float64
// topic scores
for topic , tstats := range pstats . topics {
// the topic parameters
topicParams , ok := ps . params . Topics [ topic ]
if ! ok {
// we are not scoring this topic
continue
}
2020-04-01 22:20:53 +00:00
// the topic score
var topicScore float64
2020-03-07 17:43:01 +00:00
// P1: time in Mesh
if tstats . inMesh {
p1 := float64 ( tstats . meshTime / topicParams . TimeInMeshQuantum )
2020-03-08 08:49:19 +00:00
if p1 > topicParams . TimeInMeshCap {
p1 = topicParams . TimeInMeshCap
}
2020-03-07 17:43:01 +00:00
topicScore += p1 * topicParams . TimeInMeshWeight
}
// P2: first message deliveries
p2 := tstats . firstMessageDeliveries
topicScore += p2 * topicParams . FirstMessageDeliveriesWeight
// P3: mesh message deliveries
if tstats . meshMessageDeliveriesActive {
if tstats . meshMessageDeliveries < topicParams . MeshMessageDeliveriesThreshold {
deficit := topicParams . MeshMessageDeliveriesThreshold - tstats . meshMessageDeliveries
p3 := deficit * deficit
topicScore += p3 * topicParams . MeshMessageDeliveriesWeight
}
}
2020-03-08 22:08:38 +00:00
// P3b:
2020-04-01 22:20:53 +00:00
// NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts.
2020-03-08 22:08:38 +00:00
p3b := tstats . meshFailurePenalty
topicScore += p3b * topicParams . MeshFailurePenaltyWeight
2020-03-07 17:43:01 +00:00
// P4: invalid messages
2020-04-01 22:20:53 +00:00
// NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so this detracts.
2020-05-08 16:59:33 +00:00
p4 := ( tstats . invalidMessageDeliveries * tstats . invalidMessageDeliveries )
2020-03-07 17:43:01 +00:00
topicScore += p4 * topicParams . InvalidMessageDeliveriesWeight
// update score, mixing with topic weight
score += topicScore * topicParams . TopicWeight
}
2020-03-27 14:46:41 +00:00
// apply the topic score cap, if any
if ps . params . TopicScoreCap > 0 && score > ps . params . TopicScoreCap {
score = ps . params . TopicScoreCap
}
2020-03-07 17:43:01 +00:00
// P5: application-specific score
p5 := ps . params . AppSpecificScore ( p )
score += p5 * ps . params . AppSpecificWeight
// P6: IP collocation factor
2020-07-29 17:18:55 +00:00
p6 := ps . ipColocationFactor ( p )
score += p6 * ps . params . IPColocationFactorWeight
// P7: behavioural pattern penalty
2020-08-20 14:40:36 +00:00
if pstats . behaviourPenalty > ps . params . BehaviourPenaltyThreshold {
excess := pstats . behaviourPenalty - ps . params . BehaviourPenaltyThreshold
p7 := excess * excess
score += p7 * ps . params . BehaviourPenaltyWeight
}
2020-07-29 17:18:55 +00:00
return score
}
func ( ps * peerScore ) ipColocationFactor ( p peer . ID ) float64 {
pstats , ok := ps . peerStats [ p ]
if ! ok {
return 0
}
var result float64
2021-02-09 08:17:30 +00:00
loop :
2020-03-07 17:43:01 +00:00
for _ , ip := range pstats . ips {
2021-02-09 08:23:47 +00:00
if len ( ps . params . IPColocationFactorWhitelist ) > 0 {
2021-02-12 18:16:19 +00:00
if pstats . ipWhitelist == nil {
pstats . ipWhitelist = make ( map [ string ] bool )
}
whitelisted , ok := pstats . ipWhitelist [ ip ]
if ! ok {
ipObj := net . ParseIP ( ip )
for _ , ipNet := range ps . params . IPColocationFactorWhitelist {
if ipNet . Contains ( ipObj ) {
pstats . ipWhitelist [ ip ] = true
continue loop
}
2021-02-09 08:17:30 +00:00
}
2021-02-12 18:16:19 +00:00
pstats . ipWhitelist [ ip ] = false
}
if whitelisted {
continue loop
2021-02-09 08:17:30 +00:00
}
}
2020-04-01 22:20:53 +00:00
// P6 has a cliff (IPColocationFactorThreshold); it's only applied iff
// at least that many peers are connected to us from that source IP
// addr. It is quadratic, and the weight is negative (validated by
// PeerScoreParams.validate).
2020-03-07 17:43:01 +00:00
peersInIP := len ( ps . peerIPs [ ip ] )
if peersInIP > ps . params . IPColocationFactorThreshold {
surpluss := float64 ( peersInIP - ps . params . IPColocationFactorThreshold )
2020-07-29 17:18:55 +00:00
result += surpluss * surpluss
2020-03-07 17:43:01 +00:00
}
}
2020-07-29 17:18:55 +00:00
return result
2020-03-01 12:29:24 +00:00
}
2020-05-01 18:38:32 +00:00
// behavioural pattern penalties
func ( ps * peerScore ) AddPenalty ( p peer . ID , count int ) {
if ps == nil {
return
}
ps . Lock ( )
defer ps . Unlock ( )
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
pstats . behaviourPenalty += float64 ( count )
}
2020-03-07 19:09:50 +00:00
// periodic maintenance
2020-03-09 11:37:23 +00:00
func ( ps * peerScore ) background ( ctx context . Context ) {
refreshScores := time . NewTicker ( ps . params . DecayInterval )
defer refreshScores . Stop ( )
refreshIPs := time . NewTicker ( time . Minute )
defer refreshIPs . Stop ( )
gcDeliveryRecords := time . NewTicker ( time . Minute )
defer gcDeliveryRecords . Stop ( )
2020-03-17 12:14:56 +00:00
var inspectScores <- chan time . Time
2020-07-22 18:26:44 +00:00
if ps . inspect != nil || ps . inspectEx != nil {
2020-03-17 12:14:56 +00:00
ticker := time . NewTicker ( ps . inspectPeriod )
defer ticker . Stop ( )
2020-03-27 12:10:12 +00:00
// also dump at exit for one final sample
defer ps . inspectScores ( )
2020-03-17 12:14:56 +00:00
inspectScores = ticker . C
}
2020-03-09 11:37:23 +00:00
for {
select {
case <- refreshScores . C :
ps . refreshScores ( )
case <- refreshIPs . C :
ps . refreshIPs ( )
case <- gcDeliveryRecords . C :
ps . gcDeliveryRecords ( )
2020-03-17 12:14:56 +00:00
case <- inspectScores :
ps . inspectScores ( )
2020-03-09 11:37:23 +00:00
case <- ctx . Done ( ) :
return
}
}
}
2020-04-01 22:20:53 +00:00
// inspectScores dumps all tracked scores into the inspect function.
2020-03-17 12:14:56 +00:00
func ( ps * peerScore ) inspectScores ( ) {
2020-07-22 18:26:44 +00:00
if ps . inspect != nil {
ps . inspectScoresSimple ( )
2020-07-29 17:09:06 +00:00
}
if ps . inspectEx != nil {
2020-07-22 18:26:44 +00:00
ps . inspectScoresExtended ( )
}
}
func ( ps * peerScore ) inspectScoresSimple ( ) {
2020-03-17 12:14:56 +00:00
ps . Lock ( )
scores := make ( map [ peer . ID ] float64 , len ( ps . peerStats ) )
for p := range ps . peerStats {
scores [ p ] = ps . score ( p )
}
ps . Unlock ( )
2020-04-01 22:20:53 +00:00
// Since this is a user-injected function, it could be performing I/O, and
// we don't want to block the scorer's background loop. Therefore, we launch
// it in a separate goroutine. If the function needs to synchronise, it
// should do so locally.
go ps . inspect ( scores )
2020-03-17 12:14:56 +00:00
}
2020-07-22 18:26:44 +00:00
func ( ps * peerScore ) inspectScoresExtended ( ) {
ps . Lock ( )
scores := make ( map [ peer . ID ] * PeerScoreSnapshot , len ( ps . peerStats ) )
for p , pstats := range ps . peerStats {
pss := new ( PeerScoreSnapshot )
pss . Score = ps . score ( p )
if len ( pstats . topics ) > 0 {
pss . Topics = make ( map [ string ] * TopicScoreSnapshot , len ( pstats . topics ) )
for t , ts := range pstats . topics {
2020-07-29 17:10:51 +00:00
tss := & TopicScoreSnapshot {
2020-07-22 18:26:44 +00:00
FirstMessageDeliveries : ts . firstMessageDeliveries ,
MeshMessageDeliveries : ts . meshMessageDeliveries ,
InvalidMessageDeliveries : ts . invalidMessageDeliveries ,
}
if ts . inMesh {
2020-07-29 17:10:51 +00:00
tss . TimeInMesh = ts . meshTime
2020-07-22 18:26:44 +00:00
}
2020-07-29 17:10:51 +00:00
pss . Topics [ t ] = tss
2020-07-22 18:26:44 +00:00
}
}
pss . AppSpecificScore = ps . params . AppSpecificScore ( p )
2020-07-29 17:18:55 +00:00
pss . IPColocationFactor = ps . ipColocationFactor ( p )
2020-07-22 18:26:44 +00:00
pss . BehaviourPenalty = pstats . behaviourPenalty
scores [ p ] = pss
}
ps . Unlock ( )
go ps . inspectEx ( scores )
}
2020-04-01 22:20:53 +00:00
// refreshScores decays scores, and purges score records for disconnected peers,
// once their expiry has elapsed.
2020-03-07 19:09:50 +00:00
func ( ps * peerScore ) refreshScores ( ) {
ps . Lock ( )
defer ps . Unlock ( )
now := time . Now ( )
for p , pstats := range ps . peerStats {
if ! pstats . connected {
// has the retention period expired?
if now . After ( pstats . expire ) {
2020-03-08 18:59:49 +00:00
// yes, throw it away (but clean up the IP tracking first)
ps . removeIPs ( p , pstats . ips )
2020-03-07 19:09:50 +00:00
delete ( ps . peerStats , p )
}
// we don't decay retained scores, as the peer is not active.
// this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
// unless the retention period has ellapsed.
// similarly, a well behaved peer does not lose its score by getting disconnected.
continue
}
for topic , tstats := range pstats . topics {
// the topic parameters
topicParams , ok := ps . params . Topics [ topic ]
if ! ok {
// we are not scoring this topic
continue
}
// decay counters
tstats . firstMessageDeliveries *= topicParams . FirstMessageDeliveriesDecay
2020-03-08 08:12:15 +00:00
if tstats . firstMessageDeliveries < ps . params . DecayToZero {
tstats . firstMessageDeliveries = 0
}
2020-03-07 19:09:50 +00:00
tstats . meshMessageDeliveries *= topicParams . MeshMessageDeliveriesDecay
2020-03-08 08:12:15 +00:00
if tstats . meshMessageDeliveries < ps . params . DecayToZero {
tstats . meshMessageDeliveries = 0
}
2020-03-08 22:08:38 +00:00
tstats . meshFailurePenalty *= topicParams . MeshFailurePenaltyDecay
if tstats . meshFailurePenalty < ps . params . DecayToZero {
tstats . meshFailurePenalty = 0
}
2020-03-07 19:09:50 +00:00
tstats . invalidMessageDeliveries *= topicParams . InvalidMessageDeliveriesDecay
2020-03-08 08:12:15 +00:00
if tstats . invalidMessageDeliveries < ps . params . DecayToZero {
tstats . invalidMessageDeliveries = 0
}
2020-03-07 19:09:50 +00:00
// update mesh time and activate mesh message delivery parameter if need be
if tstats . inMesh {
tstats . meshTime = now . Sub ( tstats . graftTime )
if tstats . meshTime > topicParams . MeshMessageDeliveriesActivation {
tstats . meshMessageDeliveriesActive = true
}
}
}
2020-05-01 18:38:32 +00:00
// decay P7 counter
pstats . behaviourPenalty *= ps . params . BehaviourPenaltyDecay
if pstats . behaviourPenalty < ps . params . DecayToZero {
pstats . behaviourPenalty = 0
}
2020-03-07 19:09:50 +00:00
}
}
2020-04-01 22:20:53 +00:00
// refreshIPs refreshes IPs we know of peers we're tracking.
2020-03-09 11:37:23 +00:00
func ( ps * peerScore ) refreshIPs ( ) {
ps . Lock ( )
defer ps . Unlock ( )
2020-03-07 19:09:50 +00:00
// peer IPs may change, so we periodically refresh them
2020-04-01 22:20:53 +00:00
//
// TODO: it could be more efficient to collect connections for all peers
// from the Network, populate a new map, and replace it in place. We are
// incurring in those allocs anyway, and maybe even in more, in the form of
// slices.
2020-03-09 11:22:57 +00:00
for p , pstats := range ps . peerStats {
if pstats . connected {
ips := ps . getIPs ( p )
ps . setIPs ( p , ips , pstats . ips )
pstats . ips = ips
}
}
2020-03-07 19:09:50 +00:00
}
2020-03-09 11:37:23 +00:00
func ( ps * peerScore ) gcDeliveryRecords ( ) {
ps . Lock ( )
defer ps . Unlock ( )
ps . deliveries . gc ( )
}
2020-03-01 12:29:24 +00:00
// tracer interface
func ( ps * peerScore ) AddPeer ( p peer . ID , proto protocol . ID ) {
2020-03-08 18:59:49 +00:00
ps . Lock ( )
defer ps . Unlock ( )
pstats , ok := ps . peerStats [ p ]
if ! ok {
pstats = & peerStats { topics : make ( map [ string ] * topicStats ) }
ps . peerStats [ p ] = pstats
}
pstats . connected = true
ips := ps . getIPs ( p )
2020-03-09 11:22:57 +00:00
ps . setIPs ( p , ips , pstats . ips )
2020-03-08 18:59:49 +00:00
pstats . ips = ips
2020-03-01 12:29:24 +00:00
}
func ( ps * peerScore ) RemovePeer ( p peer . ID ) {
2020-03-08 18:59:49 +00:00
ps . Lock ( )
defer ps . Unlock ( )
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
2020-03-27 15:54:59 +00:00
// decide whether to retain the score; this currently only retains non-positive scores
// to dissuade attacks on the score function.
if ps . score ( p ) > 0 {
ps . removeIPs ( p , pstats . ips )
delete ( ps . peerStats , p )
return
}
// furthermore, when we decide to retain the score, the firstMessageDelivery counters are
// reset to 0 and mesh delivery penalties applied.
for topic , tstats := range pstats . topics {
tstats . firstMessageDeliveries = 0
threshold := ps . params . Topics [ topic ] . MeshMessageDeliveriesThreshold
if tstats . inMesh && tstats . meshMessageDeliveriesActive && tstats . meshMessageDeliveries < threshold {
deficit := threshold - tstats . meshMessageDeliveries
tstats . meshFailurePenalty += deficit * deficit
}
tstats . inMesh = false
}
2020-03-08 18:59:49 +00:00
pstats . connected = false
pstats . expire = time . Now ( ) . Add ( ps . params . RetainScore )
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) Join ( topic string ) { }
func ( ps * peerScore ) Leave ( topic string ) { }
func ( ps * peerScore ) Graft ( p peer . ID , topic string ) {
2020-03-08 18:21:16 +00:00
ps . Lock ( )
defer ps . Unlock ( )
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
tstats , ok := pstats . getTopicStats ( topic , ps . params )
if ! ok {
return
}
tstats . inMesh = true
tstats . graftTime = time . Now ( )
tstats . meshTime = 0
2020-03-08 22:08:38 +00:00
tstats . meshMessageDeliveriesActive = false
2020-03-01 12:29:24 +00:00
}
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) Prune ( p peer . ID , topic string ) {
2020-03-08 18:21:16 +00:00
ps . Lock ( )
defer ps . Unlock ( )
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
tstats , ok := pstats . getTopicStats ( topic , ps . params )
if ! ok {
return
}
2020-03-08 23:17:11 +00:00
// sticky mesh delivery rate failure penalty
threshold := ps . params . Topics [ topic ] . MeshMessageDeliveriesThreshold
if tstats . meshMessageDeliveriesActive && tstats . meshMessageDeliveries < threshold {
deficit := threshold - tstats . meshMessageDeliveries
tstats . meshFailurePenalty += deficit * deficit
2020-03-08 22:08:38 +00:00
}
2020-03-08 18:21:16 +00:00
tstats . inMesh = false
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
2020-03-08 20:11:26 +00:00
func ( ps * peerScore ) ValidateMessage ( msg * Message ) {
ps . Lock ( )
defer ps . Unlock ( )
// the pubsub subsystem is beginning validation; create a record to track time in
// the validation pipeline with an accurate firstSeen time.
_ = ps . deliveries . getRecord ( ps . msgID ( msg . Message ) )
}
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) DeliverMessage ( msg * Message ) {
ps . Lock ( )
defer ps . Unlock ( )
ps . markFirstMessageDelivery ( msg . ReceivedFrom , msg )
drec := ps . deliveries . getRecord ( ps . msgID ( msg . Message ) )
2020-05-04 06:56:51 +00:00
// defensive check that this is the first delivery trace -- delivery status should be unknown
if drec . status != deliveryUnknown {
2020-09-01 17:22:30 +00:00
log . Debugf ( "unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d" , msg . ReceivedFrom , time . Now ( ) . Sub ( drec . firstSeen ) , drec . status )
2020-05-04 06:56:51 +00:00
return
}
2020-03-08 20:08:53 +00:00
// mark the message as valid and reward mesh peers that have already forwarded it to us
2020-03-27 18:34:00 +00:00
drec . status = deliveryValid
2020-03-08 20:44:45 +00:00
drec . validated = time . Now ( )
2020-03-08 20:08:53 +00:00
for p := range drec . peers {
2020-03-09 09:16:50 +00:00
// this check is to make sure a peer can't send us a message twice and get a double count
// if it is a first delivery.
if p != msg . ReceivedFrom {
ps . markDuplicateMessageDelivery ( p , msg , time . Time { } )
}
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
}
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) RejectMessage ( msg * Message , reason string ) {
ps . Lock ( )
defer ps . Unlock ( )
2020-03-08 19:43:28 +00:00
switch reason {
// we don't track those messages, but we penalize the peer as they are clearly invalid
2020-03-27 18:31:31 +00:00
case rejectMissingSignature :
2020-03-08 19:43:28 +00:00
fallthrough
2020-03-27 18:31:31 +00:00
case rejectInvalidSignature :
2020-03-28 10:52:20 +00:00
fallthrough
2020-07-23 05:47:47 +00:00
case rejectUnexpectedSignature :
fallthrough
case rejectUnexpectedAuthInfo :
fallthrough
2020-03-28 10:52:20 +00:00
case rejectSelfOrigin :
2020-03-08 14:48:45 +00:00
ps . markInvalidMessageDelivery ( msg . ReceivedFrom , msg )
2020-03-08 19:43:28 +00:00
return
// we ignore those messages, so do nothing.
2020-03-27 18:31:31 +00:00
case rejectBlacklstedPeer :
2020-03-08 19:43:28 +00:00
fallthrough
2020-03-27 18:31:31 +00:00
case rejectBlacklistedSource :
2020-03-08 14:48:45 +00:00
return
2020-03-16 16:24:37 +00:00
2020-03-27 18:31:31 +00:00
case rejectValidationQueueFull :
2020-03-16 16:24:37 +00:00
// the message was rejected before it entered the validation pipeline;
// we don't know if this message has a valid signature, and thus we also don't know if
// it has a valid message ID; all we can do is ignore it.
return
2020-03-08 14:48:45 +00:00
}
drec := ps . deliveries . getRecord ( ps . msgID ( msg . Message ) )
2020-03-01 12:29:24 +00:00
2020-05-04 06:56:51 +00:00
// defensive check that this is the first rejection trace -- delivery status should be unknown
if drec . status != deliveryUnknown {
2020-09-01 17:22:30 +00:00
log . Debugf ( "unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d" , msg . ReceivedFrom , time . Now ( ) . Sub ( drec . firstSeen ) , drec . status )
2020-05-04 06:56:51 +00:00
return
}
2020-04-28 14:40:36 +00:00
switch reason {
case rejectValidationThrottled :
2020-03-08 14:48:45 +00:00
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
// because we don't know if it was valid.
2020-03-27 18:34:00 +00:00
drec . status = deliveryThrottled
2020-03-08 14:48:45 +00:00
// release the delivery time tracking map to free some memory early
drec . peers = nil
return
2020-04-28 14:40:36 +00:00
case rejectValidationIgnored :
// we were explicitly instructed by the validator to ignore the message but not penalize
// the peer
drec . status = deliveryIgnored
drec . peers = nil
return
2020-03-08 14:48:45 +00:00
}
// mark the message as invalid and penalize peers that have already forwarded it.
2020-03-27 18:34:00 +00:00
drec . status = deliveryInvalid
2020-04-24 20:06:22 +00:00
ps . markInvalidMessageDelivery ( msg . ReceivedFrom , msg )
2020-03-08 14:48:45 +00:00
for p := range drec . peers {
ps . markInvalidMessageDelivery ( p , msg )
}
// release the delivery time tracking map to free some memory early
drec . peers = nil
2020-03-01 12:29:24 +00:00
}
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) DuplicateMessage ( msg * Message ) {
ps . Lock ( )
defer ps . Unlock ( )
drec := ps . deliveries . getRecord ( ps . msgID ( msg . Message ) )
_ , ok := drec . peers [ msg . ReceivedFrom ]
if ok {
// we have already seen this duplicate!
return
}
2020-03-01 12:29:24 +00:00
2020-03-08 14:48:45 +00:00
switch drec . status {
2020-03-27 18:34:00 +00:00
case deliveryUnknown :
2020-03-08 20:08:53 +00:00
// the message is being validated; track the peer delivery and wait for
2020-03-08 14:48:45 +00:00
// the Deliver/Reject notification.
2020-03-08 20:08:53 +00:00
drec . peers [ msg . ReceivedFrom ] = struct { } { }
2020-03-08 14:48:45 +00:00
2020-03-27 18:34:00 +00:00
case deliveryValid :
2020-03-08 14:48:45 +00:00
// mark the peer delivery time to only count a duplicate delivery once.
2020-03-08 20:08:53 +00:00
drec . peers [ msg . ReceivedFrom ] = struct { } { }
2020-03-08 20:44:45 +00:00
ps . markDuplicateMessageDelivery ( msg . ReceivedFrom , msg , drec . validated )
2020-03-08 14:48:45 +00:00
2020-03-27 18:34:00 +00:00
case deliveryInvalid :
2020-03-08 14:48:45 +00:00
// we no longer track delivery time
ps . markInvalidMessageDelivery ( msg . ReceivedFrom , msg )
2020-03-27 18:34:00 +00:00
case deliveryThrottled :
2020-03-09 09:16:50 +00:00
// the message was throttled; do nothing (we don't know if it was valid)
2020-04-28 14:40:36 +00:00
case deliveryIgnored :
// the message was ignored; do nothing
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
}
2020-09-02 18:32:10 +00:00
func ( ps * peerScore ) ThrottlePeer ( p peer . ID ) { }
2020-03-08 14:48:45 +00:00
// message delivery records
func ( d * messageDeliveries ) getRecord ( id string ) * deliveryRecord {
2020-03-09 11:07:37 +00:00
rec , ok := d . records [ id ]
if ok {
return rec
}
2020-05-04 06:56:51 +00:00
now := time . Now ( )
rec = & deliveryRecord { peers : make ( map [ peer . ID ] struct { } ) , firstSeen : now }
2020-03-09 11:07:37 +00:00
d . records [ id ] = rec
2020-05-04 06:56:51 +00:00
entry := & deliveryEntry { id : id , expire : now . Add ( TimeCacheDuration ) }
2020-03-09 11:07:37 +00:00
if d . tail != nil {
d . tail . next = entry
d . tail = entry
} else {
d . head = entry
d . tail = entry
}
return rec
}
func ( d * messageDeliveries ) gc ( ) {
if d . head == nil {
return
}
now := time . Now ( )
for d . head != nil && now . After ( d . head . expire ) {
delete ( d . records , d . head . id )
d . head = d . head . next
}
if d . head == nil {
d . tail = nil
}
2020-03-08 14:48:45 +00:00
}
2020-04-01 22:20:53 +00:00
// getTopicStats returns existing topic stats for a given a given (peer, topic)
// tuple, or initialises a new topicStats object and inserts it in the
// peerStats, iff the topic is scored.
2020-03-08 14:48:45 +00:00
func ( pstats * peerStats ) getTopicStats ( topic string , params * PeerScoreParams ) ( * topicStats , bool ) {
tstats , ok := pstats . topics [ topic ]
if ok {
return tstats , true
}
_ , scoredTopic := params . Topics [ topic ]
if ! scoredTopic {
return nil , false
}
2020-03-01 12:29:24 +00:00
2020-03-08 14:48:45 +00:00
tstats = & topicStats { }
pstats . topics [ topic ] = tstats
return tstats , true
2020-03-01 12:29:24 +00:00
}
2020-04-01 22:20:53 +00:00
// markInvalidMessageDelivery increments the "invalid message deliveries"
// counter for all scored topics the message is published in.
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) markInvalidMessageDelivery ( p peer . ID , msg * Message ) {
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
2020-09-29 15:05:54 +00:00
topic := msg . GetTopic ( )
tstats , ok := pstats . getTopicStats ( topic , ps . params )
if ! ok {
return
2020-03-08 14:48:45 +00:00
}
2020-09-29 15:05:54 +00:00
tstats . invalidMessageDeliveries += 1
2020-03-01 12:29:24 +00:00
}
2020-04-01 22:20:53 +00:00
// markFirstMessageDelivery increments the "first message deliveries" counter
// for all scored topics the message is published in, as well as the "mesh
// message deliveries" counter, if the peer is in the mesh for the topic.
2020-03-08 14:48:45 +00:00
func ( ps * peerScore ) markFirstMessageDelivery ( p peer . ID , msg * Message ) {
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
2020-09-29 15:05:54 +00:00
topic := msg . GetTopic ( )
tstats , ok := pstats . getTopicStats ( topic , ps . params )
if ! ok {
return
}
2020-03-08 14:48:45 +00:00
2020-09-29 15:05:54 +00:00
cap := ps . params . Topics [ topic ] . FirstMessageDeliveriesCap
tstats . firstMessageDeliveries += 1
if tstats . firstMessageDeliveries > cap {
tstats . firstMessageDeliveries = cap
}
2020-03-08 14:48:45 +00:00
2020-09-29 15:05:54 +00:00
if ! tstats . inMesh {
return
}
2020-03-01 12:29:24 +00:00
2020-09-29 15:05:54 +00:00
cap = ps . params . Topics [ topic ] . MeshMessageDeliveriesCap
tstats . meshMessageDeliveries += 1
if tstats . meshMessageDeliveries > cap {
tstats . meshMessageDeliveries = cap
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
}
2020-04-01 22:20:53 +00:00
// markDuplicateMessageDelivery increments the "mesh message deliveries" counter
// for messages we've seen before, as long the message was received within the
// P3 window.
2020-03-08 20:44:45 +00:00
func ( ps * peerScore ) markDuplicateMessageDelivery ( p peer . ID , msg * Message , validated time . Time ) {
2020-03-08 14:48:45 +00:00
pstats , ok := ps . peerStats [ p ]
if ! ok {
return
}
2020-09-29 15:05:54 +00:00
topic := msg . GetTopic ( )
tstats , ok := pstats . getTopicStats ( topic , ps . params )
if ! ok {
return
2020-03-08 20:08:53 +00:00
}
2020-09-29 15:05:54 +00:00
if ! tstats . inMesh {
return
}
2020-03-01 12:29:24 +00:00
2020-09-29 15:05:54 +00:00
tparams := ps . params . Topics [ topic ]
2020-04-01 22:20:53 +00:00
2020-09-29 15:05:54 +00:00
// check against the mesh delivery window -- if the validated time is passed as 0, then
// the message was received before we finished validation and thus falls within the mesh
// delivery window.
if ! validated . IsZero ( ) && time . Since ( validated ) > tparams . MeshMessageDeliveriesWindow {
return
}
2020-03-08 14:48:45 +00:00
2020-09-29 15:05:54 +00:00
cap := tparams . MeshMessageDeliveriesCap
tstats . meshMessageDeliveries += 1
if tstats . meshMessageDeliveries > cap {
tstats . meshMessageDeliveries = cap
2020-03-08 14:48:45 +00:00
}
2020-03-01 12:29:24 +00:00
}
2020-03-08 18:59:49 +00:00
2020-04-01 22:20:53 +00:00
// getIPs gets the current IPs for a peer.
2020-03-08 18:59:49 +00:00
func ( ps * peerScore ) getIPs ( p peer . ID ) [ ] string {
2020-03-09 11:22:57 +00:00
// in unit tests this can be nil
if ps . host == nil {
return nil
}
conns := ps . host . Network ( ) . ConnsToPeer ( p )
2020-04-14 09:42:43 +00:00
res := make ( [ ] string , 0 , 1 )
2020-03-09 11:22:57 +00:00
for _ , c := range conns {
remote := c . RemoteMultiaddr ( )
2020-03-27 18:18:38 +00:00
ip , err := manet . ToIP ( remote )
if err != nil {
continue
}
2020-03-09 11:22:57 +00:00
2020-04-14 16:11:12 +00:00
// ignore those; loopback is used for unit testing
if ip . IsLoopback ( ) {
2020-04-14 09:42:43 +00:00
continue
}
2020-04-14 16:11:12 +00:00
if len ( ip . To4 ( ) ) == 4 {
2020-03-27 18:18:38 +00:00
// IPv4 address
ip4 := ip . String ( )
2020-03-09 11:22:57 +00:00
res = append ( res , ip4 )
2020-04-14 16:11:12 +00:00
} else {
2020-03-27 18:18:38 +00:00
// IPv6 address -- we add both the actual address and the /64 subnet
ip6 := ip . String ( )
2020-03-09 11:22:57 +00:00
res = append ( res , ip6 )
2020-03-27 18:18:38 +00:00
2020-04-14 16:11:12 +00:00
ip6mask := ip . Mask ( net . CIDRMask ( 64 , 128 ) ) . String ( )
res = append ( res , ip6mask )
2020-03-09 11:22:57 +00:00
}
}
return res
2020-03-08 18:59:49 +00:00
}
2020-04-01 22:20:53 +00:00
// setIPs adds tracking for the new IPs in the list, and removes tracking from
// the obsolete IPs.
2020-03-09 11:22:57 +00:00
func ( ps * peerScore ) setIPs ( p peer . ID , newips , oldips [ ] string ) {
2020-03-08 18:59:49 +00:00
addNewIPs :
// add the new IPs to the tracking
for _ , ip := range newips {
// check if it is in the old ips list
for _ , xip := range oldips {
if ip == xip {
continue addNewIPs
}
}
// no, it's a new one -- add it to the tracker
peers , ok := ps . peerIPs [ ip ]
if ! ok {
peers = make ( map [ peer . ID ] struct { } )
ps . peerIPs [ ip ] = peers
}
peers [ p ] = struct { } { }
}
removeOldIPs :
// remove the obsolete old IPs from the tracking
for _ , ip := range oldips {
// check if it is in the new ips list
for _ , xip := range newips {
if ip == xip {
continue removeOldIPs
}
2020-04-14 07:16:40 +00:00
}
// no, it's obsolete -- remove it from the tracker
peers , ok := ps . peerIPs [ ip ]
if ! ok {
continue
}
delete ( peers , p )
if len ( peers ) == 0 {
delete ( ps . peerIPs , ip )
2020-03-08 18:59:49 +00:00
}
}
}
2020-04-01 22:20:53 +00:00
// removeIPs removes an IP list from the tracking list for a peer.
2020-03-08 18:59:49 +00:00
func ( ps * peerScore ) removeIPs ( p peer . ID , ips [ ] string ) {
for _ , ip := range ips {
peers , ok := ps . peerIPs [ ip ]
if ! ok {
continue
}
delete ( peers , p )
if len ( peers ) == 0 {
delete ( ps . peerIPs , ip )
}
}
}