2020-03-01 12:29:24 +00:00
|
|
|
package pubsub
|
|
|
|
|
|
|
|
import (
|
2020-03-07 17:43:01 +00:00
|
|
|
"sync"
|
2020-03-07 13:46:41 +00:00
|
|
|
"time"
|
|
|
|
|
2020-03-01 12:29:24 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
|
|
)
|
|
|
|
|
|
|
|
type PeerScoreParams struct {
|
2020-03-07 13:46:41 +00:00
|
|
|
// Score parameters per topic.
|
|
|
|
Topics map[string]*TopicScoreParams
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// P5: Application-specific peer scoring
|
|
|
|
AppSpecificScore func(p peer.ID) float64
|
|
|
|
AppSpecificWeight float64
|
|
|
|
|
2020-03-07 13:46:41 +00:00
|
|
|
// P6: IP-colocation factor.
|
|
|
|
// The parameter has an associated counter which counts the number of peers with the same IP.
|
|
|
|
// If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
|
|
|
|
// is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
|
|
|
|
// If the number of peers in the same IP is less than the threshold, then the value is 0.
|
|
|
|
// The weight of the parameter MUST be negative, unless you want to disable for testing.
|
|
|
|
// Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
|
|
|
|
// thus disabling the IP colocation penalty.
|
|
|
|
IPColocationFactorWeight float64
|
|
|
|
IPColocationFactorThreshold int
|
|
|
|
|
|
|
|
// the decay interval for parameter counters.
|
|
|
|
DecayInterval time.Duration
|
|
|
|
|
|
|
|
// counter value below which it is considered 0.
|
|
|
|
DecayToZero float64
|
|
|
|
|
|
|
|
// time to remember counters for a disconnected peer.
|
|
|
|
RetainScore time.Duration
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type TopicScoreParams struct {
|
2020-03-07 13:46:41 +00:00
|
|
|
// The weight of the topic.
|
|
|
|
TopicWeight float64
|
|
|
|
|
|
|
|
// P1: time in the mesh
|
|
|
|
// This is the time the peer has ben grafted in the mesh.
|
|
|
|
// The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap
|
|
|
|
// The weight of the parameter MUST be positive.
|
|
|
|
TimeInMeshWeight float64
|
|
|
|
TimeInMeshQuantum time.Duration
|
2020-03-07 17:43:01 +00:00
|
|
|
TimeInMeshCap float64
|
2020-03-07 13:46:41 +00:00
|
|
|
|
|
|
|
// P2: first message deliveries
|
|
|
|
// This is the number of message deliveries in the topic.
|
|
|
|
// The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
|
|
|
|
// by FirstMessageDeliveriesCap.
|
|
|
|
// The weight of the parameter MUST be positive.
|
|
|
|
FirstMessageDeliveriesWeight, FirstMessageDeliveriesDecay float64
|
2020-03-07 17:43:01 +00:00
|
|
|
FirstMessageDeliveriesCap float64
|
2020-03-07 13:46:41 +00:00
|
|
|
|
|
|
|
// P3: mesh message deliveries
|
2020-03-08 14:48:45 +00:00
|
|
|
// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
|
2020-03-08 20:44:45 +00:00
|
|
|
// message validation; deliveries during validation also count and are retroactively applied
|
|
|
|
// when validation succeeds.
|
|
|
|
// This window accounts for the minimum time before a hostile mesh peer trying to game the score
|
|
|
|
// could replay back a valid message we just sent them.
|
2020-03-09 09:16:50 +00:00
|
|
|
// It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer
|
|
|
|
// before we have forwarded it to them.
|
|
|
|
// The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
|
2020-03-07 13:46:41 +00:00
|
|
|
// If the counter exceeds the threshold, its value is 0.
|
|
|
|
// If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
|
|
|
|
// the deficit, ie (MessageDeliveriesThreshold - counter)^2
|
|
|
|
// The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
|
2020-03-08 19:56:56 +00:00
|
|
|
// The weight of the parameter MUST be negative (or zero if you want to disable it).
|
2020-03-08 14:48:45 +00:00
|
|
|
MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64
|
|
|
|
MeshMessageDeliveriesCap, MeshMessageDeliveriesThreshold float64
|
|
|
|
MeshMessageDeliveriesWindow, MeshMessageDeliveriesActivation time.Duration
|
2020-03-07 13:46:41 +00:00
|
|
|
|
2020-03-08 22:08:38 +00:00
|
|
|
// P3b: sticky mesh propagation failures
|
|
|
|
// This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
|
|
|
|
// mesh message delivery penalty.
|
|
|
|
// The weight of the parameter MUST be negative (or zero if you want to disable it)
|
|
|
|
MeshFailurePenaltyWeight, MeshFailurePenaltyDecay float64
|
|
|
|
|
2020-03-07 13:46:41 +00:00
|
|
|
// P4: invalid messages
|
|
|
|
// This is the number of invalid messages in the topic.
|
|
|
|
// The value of the parameter is a counter, decaying with InvalidMessageDeliveriesDecay.
|
|
|
|
// The weight of the parameter MUST be negative.
|
|
|
|
InvalidMessageDeliveriesWeight, InvalidMessageDeliveriesDecay float64
|
|
|
|
}
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
type peerStats struct {
|
|
|
|
// true if the peer is currently connected
|
|
|
|
connected bool
|
|
|
|
|
|
|
|
// expiration time of the score stats for disconnected peers
|
|
|
|
expire time.Time
|
|
|
|
|
|
|
|
// per topc stats
|
|
|
|
topics map[string]*topicStats
|
|
|
|
|
|
|
|
// IP tracking; store as string for easy processing
|
|
|
|
ips []string
|
|
|
|
}
|
|
|
|
|
|
|
|
type topicStats struct {
|
|
|
|
// true if the peer is in the mesh
|
|
|
|
inMesh bool
|
|
|
|
|
|
|
|
// time when the peer was (last) GRAFTed; valid only when in mesh
|
|
|
|
graftTime time.Time
|
|
|
|
|
|
|
|
// time in mesh (updated during refresh/decay to avoid calling gettimeofday on
|
|
|
|
// every score invocation)
|
|
|
|
meshTime time.Duration
|
|
|
|
|
|
|
|
// first message deliveries
|
|
|
|
firstMessageDeliveries float64
|
|
|
|
|
|
|
|
// mesh message deliveries
|
|
|
|
meshMessageDeliveries float64
|
|
|
|
|
|
|
|
// true if the peer has been enough time in the mesh to activate mess message deliveries
|
|
|
|
meshMessageDeliveriesActive bool
|
|
|
|
|
2020-03-08 22:08:38 +00:00
|
|
|
// sticky mesh rate failure penalty counter
|
|
|
|
meshFailurePenalty float64
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// invalid message counter
|
|
|
|
invalidMessageDeliveries float64
|
|
|
|
}
|
|
|
|
|
2020-03-07 13:46:41 +00:00
|
|
|
type peerScore struct {
|
2020-03-07 17:43:01 +00:00
|
|
|
sync.Mutex
|
|
|
|
|
|
|
|
// the score parameters
|
|
|
|
params *PeerScoreParams
|
|
|
|
|
|
|
|
// per peer stats for score calculation
|
|
|
|
peerStats map[peer.ID]*peerStats
|
|
|
|
|
|
|
|
// IP colocation tracking
|
2020-03-07 19:11:45 +00:00
|
|
|
peerIPs map[string]map[peer.ID]struct{}
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
// message delivery tracking
|
|
|
|
deliveries *messageDeliveries
|
|
|
|
|
|
|
|
msgID MsgIdFunction
|
|
|
|
}
|
|
|
|
|
|
|
|
type messageDeliveries struct {
|
|
|
|
// TODO
|
|
|
|
}
|
|
|
|
|
|
|
|
type deliveryRecord struct {
|
|
|
|
status int
|
2020-03-08 20:44:45 +00:00
|
|
|
validated time.Time
|
2020-03-08 20:08:53 +00:00
|
|
|
peers map[peer.ID]struct{}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
// delivery record status
|
|
|
|
const (
|
2020-03-08 19:56:56 +00:00
|
|
|
delivery_unknown = iota // we don't know (yet) if the message is valid
|
|
|
|
delivery_valid // we know the message is valid
|
|
|
|
delivery_invalid // we know the message is invalid
|
|
|
|
delivery_throttled // we can't tell if it is valid because validation throttled
|
2020-03-08 14:48:45 +00:00
|
|
|
)
|
|
|
|
|
2020-03-07 15:59:22 +00:00
|
|
|
func newPeerScore(params *PeerScoreParams) *peerScore {
|
2020-03-08 14:48:45 +00:00
|
|
|
// TODO
|
2020-03-01 12:29:24 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// router interface
|
2020-03-07 15:59:22 +00:00
|
|
|
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
2020-03-08 14:48:45 +00:00
|
|
|
// TODO
|
2020-03-07 15:59:22 +00:00
|
|
|
}
|
|
|
|
|
2020-03-02 13:05:24 +00:00
|
|
|
func (ps *peerScore) Score(p peer.ID) float64 {
|
2020-03-07 23:54:40 +00:00
|
|
|
if ps == nil {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
var score float64
|
|
|
|
|
|
|
|
// topic scores
|
|
|
|
for topic, tstats := range pstats.topics {
|
|
|
|
// the topic score
|
|
|
|
var topicScore float64
|
|
|
|
|
|
|
|
// the topic parameters
|
|
|
|
topicParams, ok := ps.params.Topics[topic]
|
|
|
|
if !ok {
|
|
|
|
// we are not scoring this topic
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// P1: time in Mesh
|
|
|
|
if tstats.inMesh {
|
|
|
|
p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum)
|
2020-03-08 08:49:19 +00:00
|
|
|
if p1 > topicParams.TimeInMeshCap {
|
|
|
|
p1 = topicParams.TimeInMeshCap
|
|
|
|
}
|
2020-03-07 17:43:01 +00:00
|
|
|
topicScore += p1 * topicParams.TimeInMeshWeight
|
|
|
|
}
|
|
|
|
|
|
|
|
// P2: first message deliveries
|
|
|
|
p2 := tstats.firstMessageDeliveries
|
|
|
|
topicScore += p2 * topicParams.FirstMessageDeliveriesWeight
|
|
|
|
|
|
|
|
// P3: mesh message deliveries
|
|
|
|
if tstats.meshMessageDeliveriesActive {
|
|
|
|
if tstats.meshMessageDeliveries < topicParams.MeshMessageDeliveriesThreshold {
|
|
|
|
deficit := topicParams.MeshMessageDeliveriesThreshold - tstats.meshMessageDeliveries
|
|
|
|
p3 := deficit * deficit
|
|
|
|
topicScore += p3 * topicParams.MeshMessageDeliveriesWeight
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-08 22:08:38 +00:00
|
|
|
// P3b:
|
|
|
|
p3b := tstats.meshFailurePenalty
|
|
|
|
topicScore += p3b * topicParams.MeshFailurePenaltyWeight
|
|
|
|
|
2020-03-07 17:43:01 +00:00
|
|
|
// P4: invalid messages
|
|
|
|
p4 := tstats.invalidMessageDeliveries
|
|
|
|
topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight
|
|
|
|
|
|
|
|
// update score, mixing with topic weight
|
|
|
|
score += topicScore * topicParams.TopicWeight
|
|
|
|
}
|
|
|
|
|
|
|
|
// P5: application-specific score
|
|
|
|
p5 := ps.params.AppSpecificScore(p)
|
|
|
|
score += p5 * ps.params.AppSpecificWeight
|
|
|
|
|
|
|
|
// P6: IP collocation factor
|
|
|
|
for _, ip := range pstats.ips {
|
|
|
|
peersInIP := len(ps.peerIPs[ip])
|
|
|
|
if peersInIP > ps.params.IPColocationFactorThreshold {
|
|
|
|
surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold)
|
|
|
|
p6 := surpluss * surpluss
|
|
|
|
score += p6 * ps.params.IPColocationFactorWeight
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return score
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-07 19:09:50 +00:00
|
|
|
// periodic maintenance
|
|
|
|
func (ps *peerScore) refreshScores() {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
for p, pstats := range ps.peerStats {
|
|
|
|
if !pstats.connected {
|
|
|
|
// has the retention period expired?
|
|
|
|
if now.After(pstats.expire) {
|
2020-03-08 18:59:49 +00:00
|
|
|
// yes, throw it away (but clean up the IP tracking first)
|
|
|
|
ps.removeIPs(p, pstats.ips)
|
2020-03-07 19:09:50 +00:00
|
|
|
delete(ps.peerStats, p)
|
|
|
|
}
|
|
|
|
|
|
|
|
// we don't decay retained scores, as the peer is not active.
|
|
|
|
// this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
|
|
|
|
// unless the retention period has ellapsed.
|
|
|
|
// similarly, a well behaved peer does not lose its score by getting disconnected.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for topic, tstats := range pstats.topics {
|
|
|
|
// the topic parameters
|
|
|
|
topicParams, ok := ps.params.Topics[topic]
|
|
|
|
if !ok {
|
|
|
|
// we are not scoring this topic
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// decay counters
|
|
|
|
tstats.firstMessageDeliveries *= topicParams.FirstMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.firstMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.firstMessageDeliveries = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
tstats.meshMessageDeliveries *= topicParams.MeshMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.meshMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.meshMessageDeliveries = 0
|
|
|
|
}
|
2020-03-08 22:08:38 +00:00
|
|
|
tstats.meshFailurePenalty *= topicParams.MeshFailurePenaltyDecay
|
|
|
|
if tstats.meshFailurePenalty < ps.params.DecayToZero {
|
|
|
|
tstats.meshFailurePenalty = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
tstats.invalidMessageDeliveries *= topicParams.InvalidMessageDeliveriesDecay
|
2020-03-08 08:12:15 +00:00
|
|
|
if tstats.invalidMessageDeliveries < ps.params.DecayToZero {
|
|
|
|
tstats.invalidMessageDeliveries = 0
|
|
|
|
}
|
2020-03-07 19:09:50 +00:00
|
|
|
// update mesh time and activate mesh message delivery parameter if need be
|
|
|
|
if tstats.inMesh {
|
|
|
|
tstats.meshTime = now.Sub(tstats.graftTime)
|
|
|
|
if tstats.meshTime > topicParams.MeshMessageDeliveriesActivation {
|
|
|
|
tstats.meshMessageDeliveriesActive = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ps *peerScore) resfreshIPs() {
|
|
|
|
// peer IPs may change, so we periodically refresh them
|
2020-03-08 14:48:45 +00:00
|
|
|
// TODO
|
2020-03-07 19:09:50 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 12:29:24 +00:00
|
|
|
// tracer interface
|
|
|
|
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
|
2020-03-08 18:59:49 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
pstats = &peerStats{topics: make(map[string]*topicStats)}
|
|
|
|
ps.peerStats[p] = pstats
|
|
|
|
}
|
|
|
|
|
|
|
|
pstats.connected = true
|
|
|
|
ips := ps.getIPs(p)
|
|
|
|
ps.addIPs(p, ips, pstats.ips)
|
|
|
|
pstats.ips = ips
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ps *peerScore) RemovePeer(p peer.ID) {
|
2020-03-08 18:59:49 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
pstats.connected = false
|
|
|
|
pstats.expire = time.Now().Add(ps.params.RetainScore)
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) Join(topic string) {}
|
|
|
|
func (ps *peerScore) Leave(topic string) {}
|
|
|
|
|
|
|
|
func (ps *peerScore) Graft(p peer.ID, topic string) {
|
2020-03-08 18:21:16 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats.inMesh = true
|
|
|
|
tstats.graftTime = time.Now()
|
|
|
|
tstats.meshTime = 0
|
2020-03-08 22:08:38 +00:00
|
|
|
tstats.meshMessageDeliveriesActive = false
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) Prune(p peer.ID, topic string) {
|
2020-03-08 18:21:16 +00:00
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-08 23:17:11 +00:00
|
|
|
// sticky mesh delivery rate failure penalty
|
|
|
|
threshold := ps.params.Topics[topic].MeshMessageDeliveriesThreshold
|
|
|
|
if tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold {
|
|
|
|
deficit := threshold - tstats.meshMessageDeliveries
|
|
|
|
tstats.meshFailurePenalty += deficit * deficit
|
2020-03-08 22:08:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 18:21:16 +00:00
|
|
|
tstats.inMesh = false
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 20:11:26 +00:00
|
|
|
func (ps *peerScore) ValidateMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
// the pubsub subsystem is beginning validation; create a record to track time in
|
|
|
|
// the validation pipeline with an accurate firstSeen time.
|
|
|
|
_ = ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) DeliverMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
ps.markFirstMessageDelivery(msg.ReceivedFrom, msg)
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
|
2020-03-08 20:08:53 +00:00
|
|
|
// mark the message as valid and reward mesh peers that have already forwarded it to us
|
2020-03-08 14:48:45 +00:00
|
|
|
drec.status = delivery_valid
|
2020-03-08 20:44:45 +00:00
|
|
|
drec.validated = time.Now()
|
2020-03-08 20:08:53 +00:00
|
|
|
for p := range drec.peers {
|
2020-03-09 09:16:50 +00:00
|
|
|
// this check is to make sure a peer can't send us a message twice and get a double count
|
|
|
|
// if it is a first delivery.
|
|
|
|
if p != msg.ReceivedFrom {
|
|
|
|
ps.markDuplicateMessageDelivery(p, msg, time.Time{})
|
|
|
|
}
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
2020-03-08 19:43:28 +00:00
|
|
|
// TODO: the reasons should become named strings; good enough for now.
|
|
|
|
switch reason {
|
|
|
|
// we don't track those messages, but we penalize the peer as they are clearly invalid
|
|
|
|
case "missing signature":
|
|
|
|
fallthrough
|
|
|
|
case "invalid signature":
|
2020-03-08 14:48:45 +00:00
|
|
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
2020-03-08 19:43:28 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
// we ignore those messages, so do nothing.
|
|
|
|
case "blacklisted peer":
|
|
|
|
fallthrough
|
|
|
|
case "blacklisted source":
|
2020-03-08 14:48:45 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
if reason == "validation throttled" {
|
|
|
|
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
|
|
|
|
// because we don't know if it was valid.
|
|
|
|
drec.status = delivery_throttled
|
|
|
|
// release the delivery time tracking map to free some memory early
|
|
|
|
drec.peers = nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// mark the message as invalid and penalize peers that have already forwarded it.
|
|
|
|
drec.status = delivery_invalid
|
|
|
|
for p := range drec.peers {
|
|
|
|
ps.markInvalidMessageDelivery(p, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// release the delivery time tracking map to free some memory early
|
|
|
|
drec.peers = nil
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) DuplicateMessage(msg *Message) {
|
|
|
|
ps.Lock()
|
|
|
|
defer ps.Unlock()
|
|
|
|
|
|
|
|
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
|
|
|
|
|
|
|
_, ok := drec.peers[msg.ReceivedFrom]
|
|
|
|
if ok {
|
|
|
|
// we have already seen this duplicate!
|
|
|
|
return
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
switch drec.status {
|
|
|
|
case delivery_unknown:
|
2020-03-08 20:08:53 +00:00
|
|
|
// the message is being validated; track the peer delivery and wait for
|
2020-03-08 14:48:45 +00:00
|
|
|
// the Deliver/Reject notification.
|
2020-03-08 20:08:53 +00:00
|
|
|
drec.peers[msg.ReceivedFrom] = struct{}{}
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
case delivery_valid:
|
|
|
|
// mark the peer delivery time to only count a duplicate delivery once.
|
2020-03-08 20:08:53 +00:00
|
|
|
drec.peers[msg.ReceivedFrom] = struct{}{}
|
2020-03-08 20:44:45 +00:00
|
|
|
ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.validated)
|
2020-03-08 14:48:45 +00:00
|
|
|
|
|
|
|
case delivery_invalid:
|
|
|
|
// we no longer track delivery time
|
|
|
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
|
|
|
|
|
|
|
case delivery_throttled:
|
2020-03-09 09:16:50 +00:00
|
|
|
// the message was throttled; do nothing (we don't know if it was valid)
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
// message delivery records
|
|
|
|
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
|
|
|
// TODO
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// utilities
|
|
|
|
func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) {
|
|
|
|
tstats, ok := pstats.topics[topic]
|
|
|
|
if ok {
|
|
|
|
return tstats, true
|
|
|
|
}
|
|
|
|
|
|
|
|
_, scoredTopic := params.Topics[topic]
|
|
|
|
if !scoredTopic {
|
|
|
|
return nil, false
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats = &topicStats{}
|
|
|
|
pstats.topics[topic] = tstats
|
|
|
|
|
|
|
|
return tstats, true
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.invalidMessageDeliveries += 1
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
|
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.firstMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.firstMessageDeliveries > cap {
|
|
|
|
tstats.firstMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if !tstats.inMesh {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.meshMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.meshMessageDeliveries > cap {
|
|
|
|
tstats.meshMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
|
2020-03-08 20:08:53 +00:00
|
|
|
var now time.Time
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
pstats, ok := ps.peerStats[p]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
if !validated.IsZero() {
|
2020-03-08 20:08:53 +00:00
|
|
|
now = time.Now()
|
|
|
|
}
|
|
|
|
|
2020-03-08 14:48:45 +00:00
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if !tstats.inMesh {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
|
2020-03-08 20:44:45 +00:00
|
|
|
// check against the mesh delivery window -- if the validated time is passed as 0, then
|
|
|
|
// the message was received before we finished validation and thus falls within the mesh
|
|
|
|
// delivery window.
|
|
|
|
if !validated.IsZero() && now.After(validated.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) {
|
2020-03-08 14:48:45 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-03-08 15:22:16 +00:00
|
|
|
cap := ps.params.Topics[topic].MeshMessageDeliveriesCap
|
2020-03-08 14:48:45 +00:00
|
|
|
tstats.meshMessageDeliveries += 1
|
2020-03-08 15:22:16 +00:00
|
|
|
if tstats.meshMessageDeliveries > cap {
|
|
|
|
tstats.meshMessageDeliveries = cap
|
2020-03-08 14:48:45 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-01 12:29:24 +00:00
|
|
|
}
|
2020-03-08 18:59:49 +00:00
|
|
|
|
|
|
|
// gets the current IPs for a peer
|
|
|
|
func (ps *peerScore) getIPs(p peer.ID) []string {
|
|
|
|
// TODO
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// merges two IP lists, adds tracking for the new IPs in the list, and removes tracking
|
|
|
|
// from the obsolete ips.
|
|
|
|
func (ps *peerScore) addIPs(p peer.ID, newips, oldips []string) {
|
|
|
|
addNewIPs:
|
|
|
|
// add the new IPs to the tracking
|
|
|
|
for _, ip := range newips {
|
|
|
|
// check if it is in the old ips list
|
|
|
|
for _, xip := range oldips {
|
|
|
|
if ip == xip {
|
|
|
|
continue addNewIPs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// no, it's a new one -- add it to the tracker
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
peers = make(map[peer.ID]struct{})
|
|
|
|
ps.peerIPs[ip] = peers
|
|
|
|
}
|
|
|
|
peers[p] = struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
removeOldIPs:
|
|
|
|
// remove the obsolete old IPs from the tracking
|
|
|
|
for _, ip := range oldips {
|
|
|
|
// check if it is in the new ips list
|
|
|
|
for _, xip := range newips {
|
|
|
|
if ip == xip {
|
|
|
|
continue removeOldIPs
|
|
|
|
}
|
|
|
|
// no, it's obsolete -- remove it from the tracker
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
delete(peers, p)
|
|
|
|
if len(peers) == 0 {
|
|
|
|
delete(ps.peerIPs, ip)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// removes an IP list from the tracking list
|
|
|
|
func (ps *peerScore) removeIPs(p peer.ID, ips []string) {
|
|
|
|
for _, ip := range ips {
|
|
|
|
peers, ok := ps.peerIPs[ip]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
delete(peers, p)
|
|
|
|
if len(peers) == 0 {
|
|
|
|
delete(ps.peerIPs, ip)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|