194 lines
5.8 KiB
Go
194 lines
5.8 KiB
Go
|
package pubsub
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/libp2p/go-libp2p-core/connmgr"
|
||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// GossipSubConnTagValueDirectPeer is the connection manager tag value to
|
||
|
// apply to direct peers. This should be high, as we want to prioritize these
|
||
|
// connections above all others.
|
||
|
GossipSubConnTagValueDirectPeer = 1000
|
||
|
|
||
|
// GossipSubConnTagValueMeshPeer is the connection manager tag value to apply to
|
||
|
// peers in a topic mesh. If a peer is in the mesh for multiple topics, their
|
||
|
// connection will be tagged separately for each.
|
||
|
GossipSubConnTagValueMeshPeer = 20
|
||
|
|
||
|
// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
|
||
|
// tag that tracks message deliveries. Each time a peer is the first to deliver a
|
||
|
// message within a topic, we "bump" a tag by this amount, up to a maximum
|
||
|
// of GossipSubConnTagMessageDeliveryCap.
|
||
|
// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
|
||
|
// at every GossipSubConnTagDecayInterval.
|
||
|
GossipSubConnTagBumpMessageDelivery = 1
|
||
|
|
||
|
// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
|
||
|
GossipSubConnTagDecayInterval = time.Minute
|
||
|
|
||
|
// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
|
||
|
GossipSubConnTagDecayAmount = 1
|
||
|
|
||
|
// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
|
||
|
// track message deliveries.
|
||
|
GossipSubConnTagMessageDeliveryCap = 15
|
||
|
)
|
||
|
|
||
|
// tagTracer is an internal tracer that applies connection manager tags to peer
|
||
|
// connections based on their behavior.
|
||
|
//
|
||
|
// We tag a peer's connections for the following reasons:
|
||
|
// - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000).
|
||
|
// - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20).
|
||
|
// If a peer is in multiple topic meshes, they'll be tagged for each.
|
||
|
// - For each message that we receive, we bump a delivery tag for peer that delivered the message
|
||
|
// first.
|
||
|
// The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at
|
||
|
// a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval (default 1/minute).
|
||
|
// With the defaults, a peer who stops delivering messages will have their delivery tag decay to zero
|
||
|
// in fifteen minutes.
|
||
|
type tagTracer struct {
|
||
|
sync.Mutex
|
||
|
|
||
|
cmgr connmgr.ConnManager
|
||
|
decayer connmgr.Decayer
|
||
|
decaying map[string]connmgr.DecayingTag
|
||
|
direct map[peer.ID]struct{}
|
||
|
}
|
||
|
|
||
|
func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
|
||
|
decayer, ok := connmgr.SupportsDecay(cmgr)
|
||
|
if !ok {
|
||
|
log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied")
|
||
|
}
|
||
|
return &tagTracer{
|
||
|
cmgr: cmgr,
|
||
|
decayer: decayer,
|
||
|
decaying: make(map[string]connmgr.DecayingTag),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
|
||
|
if t.direct == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// tag peer if it is a direct peer
|
||
|
_, direct := t.direct[p]
|
||
|
if direct {
|
||
|
t.cmgr.TagPeer(p, "pubsub:direct", GossipSubConnTagValueDirectPeer)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) {
|
||
|
tag := topicTag(topic)
|
||
|
t.cmgr.TagPeer(p, tag, GossipSubConnTagValueMeshPeer)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) {
|
||
|
tag := topicTag(topic)
|
||
|
t.cmgr.UntagPeer(p, tag)
|
||
|
}
|
||
|
|
||
|
func topicTag(topic string) string {
|
||
|
return fmt.Sprintf("pubsub:%s", topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) addDeliveryTag(topic string) {
|
||
|
if t.decayer == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
t.Lock()
|
||
|
defer t.Unlock()
|
||
|
tag, err := t.decayingDeliveryTag(topic)
|
||
|
if err != nil {
|
||
|
log.Warnf("unable to create decaying delivery tag: %s", err)
|
||
|
return
|
||
|
}
|
||
|
t.decaying[topic] = tag
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) removeDeliveryTag(topic string) {
|
||
|
t.Lock()
|
||
|
defer t.Unlock()
|
||
|
delete(t.decaying, topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) decayingDeliveryTag(topic string) (connmgr.DecayingTag, error) {
|
||
|
name := fmt.Sprintf("pubsub-deliveries:%s", topic)
|
||
|
|
||
|
// decrement tag value by GossipSubConnTagDecayAmount at each decay interval
|
||
|
decayFn := func(value connmgr.DecayingValue) (after int, rm bool) {
|
||
|
v := value.Value - GossipSubConnTagDecayAmount
|
||
|
return v, v <= 0
|
||
|
}
|
||
|
|
||
|
// bump up to max of GossipSubConnTagMessageDeliveryCap
|
||
|
bumpFn := func(value connmgr.DecayingValue, delta int) (after int) {
|
||
|
val := value.Value + delta
|
||
|
if val > GossipSubConnTagMessageDeliveryCap {
|
||
|
return GossipSubConnTagMessageDeliveryCap
|
||
|
}
|
||
|
return val
|
||
|
}
|
||
|
|
||
|
return t.decayer.RegisterDecayingTag(name, GossipSubConnTagDecayInterval, decayFn, bumpFn)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error {
|
||
|
tag, ok := t.decaying[topic]
|
||
|
if !ok {
|
||
|
return fmt.Errorf("no decaying tag registered for topic %s", topic)
|
||
|
}
|
||
|
return tag.Bump(p, GossipSubConnTagBumpMessageDelivery)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
|
||
|
for _, topic := range msg.TopicIDs {
|
||
|
err := t.bumpDeliveryTag(p, topic)
|
||
|
if err != nil {
|
||
|
log.Warnf("error bumping delivery tag: %s", err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// -- internalTracer interface methods
|
||
|
var _ internalTracer = (*tagTracer)(nil)
|
||
|
|
||
|
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
|
||
|
t.tagPeerIfDirect(p)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) Join(topic string) {
|
||
|
t.addDeliveryTag(topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) DeliverMessage(msg *Message) {
|
||
|
// TODO: also give a bump to "near-first" message deliveries
|
||
|
t.bumpTagsForMessage(msg.ReceivedFrom, msg)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) Leave(topic string) {
|
||
|
t.removeDeliveryTag(topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) Graft(p peer.ID, topic string) {
|
||
|
t.tagMeshPeer(p, topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) Prune(p peer.ID, topic string) {
|
||
|
t.untagMeshPeer(p, topic)
|
||
|
}
|
||
|
|
||
|
func (t *tagTracer) RemovePeer(peer.ID) {}
|
||
|
func (t *tagTracer) ValidateMessage(*Message) {}
|
||
|
func (t *tagTracer) RejectMessage(*Message, string) {}
|
||
|
func (t *tagTracer) DuplicateMessage(*Message) {}
|