diff --git a/floodsub_test.go b/floodsub_test.go index 6245c12..0ddd158 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -41,12 +41,16 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub } } -func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { +func getNetHosts(t *testing.T, ctx context.Context, n int, options ...func() bhost.Option) []host.Host { var out []host.Host for i := 0; i < n; i++ { netw := swarmt.GenSwarm(t, ctx) - h := bhost.NewBlankHost(netw) + opts := make([]bhost.Option, len(options)) + for i, optFn := range options { + opts[i] = optFn() + } + h := bhost.NewBlankHost(netw, opts...) out = append(out, h) } diff --git a/gossipsub.go b/gossipsub.go index c7666f9..a9accd1 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -124,7 +124,12 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er opportunisticGraftTicks: GossipSubOpportunisticGraftTicks, fanoutTTL: GossipSubFanoutTTL, + + tagTracer: newTagTracer(h.ConnManager()), } + + // use the withInternalTracer option to hook up the tag tracer + opts = append(opts, withInternalTracer(rt.tagTracer)) return NewPubSub(ctx, h, rt, opts...) } @@ -224,6 +229,10 @@ func WithDirectPeers(pis []peer.AddrInfo) Option { gs.direct = direct + if gs.tagTracer != nil { + gs.tagTracer.direct = direct + } + return nil } } @@ -254,6 +263,7 @@ type GossipSubRouter struct { tracer *pubsubTracer score *peerScore gossipTracer *gossipTracer + tagTracer *tagTracer // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // nodes. @@ -347,12 +357,6 @@ func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { gs.tracer.AddPeer(p, proto) gs.peers[p] = proto - // tag peer if it is a direct peer - _, direct := gs.direct[p] - if direct { - gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000) - } - // track the connection direction outbound := false conns := gs.p.host.Network().ConnsToPeer(p) @@ -621,7 +625,6 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. log.Debugf("GRAFT: add mesh link from %s in %s", p, topic) gs.tracer.Graft(p, topic) peers[p] = struct{}{} - gs.tagPeer(p, topic) } if len(prune) == 0 { @@ -649,7 +652,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) gs.tracer.Prune(p, topic) delete(peers, p) - gs.untagPeer(p, topic) // is there a backoff specified by the peer? if so obey it. backoff := prune.GetBackoff() if backoff > 0 { @@ -889,7 +891,6 @@ func (gs *GossipSubRouter) Join(topic string) { log.Debugf("JOIN: Add mesh link to %s in %s", p, topic) gs.tracer.Graft(p, topic) gs.sendGraft(p, topic) - gs.tagPeer(p, topic) } } @@ -908,7 +909,6 @@ func (gs *GossipSubRouter) Leave(topic string) { log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) gs.tracer.Prune(p, topic) gs.sendPrune(p, topic) - gs.untagPeer(p, topic) } } @@ -1168,7 +1168,6 @@ func (gs *GossipSubRouter) heartbeat() { prunePeer := func(p peer.ID) { gs.tracer.Prune(p, topic) delete(peers, p) - gs.untagPeer(p, topic) gs.addBackoff(p, topic) topics := toprune[p] toprune[p] = append(topics, topic) @@ -1178,7 +1177,6 @@ func (gs *GossipSubRouter) heartbeat() { log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) gs.tracer.Graft(p, topic) peers[p] = struct{}{} - gs.tagPeer(p, topic) topics := tograft[p] tograft[p] = append(topics, topic) } @@ -1672,20 +1670,6 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } -func (gs *GossipSubRouter) tagPeer(p peer.ID, topic string) { - tag := topicTag(topic) - gs.p.host.ConnManager().TagPeer(p, tag, 20) -} - -func (gs *GossipSubRouter) untagPeer(p peer.ID, topic string) { - tag := topicTag(topic) - gs.p.host.ConnManager().UntagPeer(p, tag) -} - -func topicTag(topic string) string { - return fmt.Sprintf("pubsub:%s", topic) -} - func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { pmap := make(map[peer.ID]struct{}) for _, p := range peers { diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go new file mode 100644 index 0000000..952bf1d --- /dev/null +++ b/gossipsub_connmgr_test.go @@ -0,0 +1,97 @@ +package pubsub + +import ( + "context" + "testing" + "time" + + bhost "github.com/libp2p/go-libp2p-blankhost" + connmgr "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-core/peer" +) + +// This file has tests for gossipsub's interaction with the libp2p connection manager. +// We tag connections for three reasons: +// +// - direct peers get a `pubsub:direct` tag with a value of GossipSubConnTagValueDirectPeer +// - mesh members get a `pubsub:$topic` tag with a value of GossipSubConnTagValueMeshPeer (default 20) +// - applies for each topic they're a mesh member of +// - anyone who delivers a message first gets a bump to a decaying `pubsub:deliveries:$topic` tag + +func TestGossipsubConnTagDirectPeers(t *testing.T) { + // test that direct peers get tagged with GossipSubConnTagValueDirectPeer + t.Skip("coming soon") +} + +func TestGossipsubConnTagMeshPeers(t *testing.T) { + // test that mesh peers get tagged with GossipSubConnTagValueMeshPeer + t.Skip("coming soon") +} + +func TestGossipsubConnTagMessageDeliveries(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // set the gossipsub D parameters low, so that not all + // test peers will be in a mesh together + oldGossipSubD := GossipSubD + oldGossipSubDHi := GossipSubDhi + GossipSubD = 4 + GossipSubDhi = 4 + defer func() { + GossipSubD = oldGossipSubD + GossipSubDhi = oldGossipSubDHi + }() + + decayCfg := connmgr.DecayerCfg{ + Resolution: time.Second, + } + + hosts := getNetHosts(t, ctx, 20, func() bhost.Option { + return bhost.WithConnectionManager( + connmgr.NewConnManager(1, 30, 10*time.Millisecond, + connmgr.DecayerConfig(&decayCfg))) + }) + + // use flood publishing, so non-mesh peers will still be delivering messages + // to everyone + psubs := getGossipsubs(ctx, hosts, + WithFloodPublish(true)) + denseConnect(t, hosts) + + // subscribe everyone to the topic + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + // wait a few heartbeats for meshes to form + time.Sleep(2 * time.Second) + + // have all the hosts publish messages + +} + +func getMeshPeers(ps PubSub, topic string) []peer.ID { + gs := ps.rt.(*GossipSubRouter) + + peerCh := make(chan peer.ID) + ps.eval <- func() { + peers := gs.mesh[topic] + for pid, _ := range peers { + peerCh <- pid + } + close(peerCh) + } + + var out []peer.ID + for pid := range peerCh { + out = append(out, pid) + } + return out +} diff --git a/pubsub.go b/pubsub.go index 228fc1c..480de15 100644 --- a/pubsub.go +++ b/pubsub.go @@ -383,6 +383,18 @@ func WithEventTracer(tracer EventTracer) Option { } } +// withInternalTracer adds an internal event tracer to the pubsub system +func withInternalTracer(tracer internalTracer) Option { + return func(p *PubSub) error { + if p.tracer != nil { + p.tracer.internal = append(p.tracer.internal, tracer) + } else { + p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID} + } + return nil + } +} + // WithMaxMessageSize sets the global maximum message size for pubsub wire // messages. The default value is 1MiB (DefaultMaxMessageSize). // diff --git a/tag_tracer.go b/tag_tracer.go new file mode 100644 index 0000000..6caa32c --- /dev/null +++ b/tag_tracer.go @@ -0,0 +1,193 @@ +package pubsub + +import ( + "fmt" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +var ( + // GossipSubConnTagValueDirectPeer is the connection manager tag value to + // apply to direct peers. This should be high, as we want to prioritize these + // connections above all others. + GossipSubConnTagValueDirectPeer = 1000 + + // GossipSubConnTagValueMeshPeer is the connection manager tag value to apply to + // peers in a topic mesh. If a peer is in the mesh for multiple topics, their + // connection will be tagged separately for each. + GossipSubConnTagValueMeshPeer = 20 + + // GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager + // tag that tracks message deliveries. Each time a peer is the first to deliver a + // message within a topic, we "bump" a tag by this amount, up to a maximum + // of GossipSubConnTagMessageDeliveryCap. + // Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount + // at every GossipSubConnTagDecayInterval. + GossipSubConnTagBumpMessageDelivery = 1 + + // GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags. + GossipSubConnTagDecayInterval = time.Minute + + // GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval. + GossipSubConnTagDecayAmount = 1 + + // GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that + // track message deliveries. + GossipSubConnTagMessageDeliveryCap = 15 +) + +// tagTracer is an internal tracer that applies connection manager tags to peer +// connections based on their behavior. +// +// We tag a peer's connections for the following reasons: +// - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000). +// - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20). +// If a peer is in multiple topic meshes, they'll be tagged for each. +// - For each message that we receive, we bump a delivery tag for peer that delivered the message +// first. +// The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at +// a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval (default 1/minute). +// With the defaults, a peer who stops delivering messages will have their delivery tag decay to zero +// in fifteen minutes. +type tagTracer struct { + sync.Mutex + + cmgr connmgr.ConnManager + decayer connmgr.Decayer + decaying map[string]connmgr.DecayingTag + direct map[peer.ID]struct{} +} + +func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { + decayer, ok := connmgr.SupportsDecay(cmgr) + if !ok { + log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied") + } + return &tagTracer{ + cmgr: cmgr, + decayer: decayer, + decaying: make(map[string]connmgr.DecayingTag), + } +} + +func (t *tagTracer) tagPeerIfDirect(p peer.ID) { + if t.direct == nil { + return + } + + // tag peer if it is a direct peer + _, direct := t.direct[p] + if direct { + t.cmgr.TagPeer(p, "pubsub:direct", GossipSubConnTagValueDirectPeer) + } +} + +func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) { + tag := topicTag(topic) + t.cmgr.TagPeer(p, tag, GossipSubConnTagValueMeshPeer) +} + +func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) { + tag := topicTag(topic) + t.cmgr.UntagPeer(p, tag) +} + +func topicTag(topic string) string { + return fmt.Sprintf("pubsub:%s", topic) +} + +func (t *tagTracer) addDeliveryTag(topic string) { + if t.decayer == nil { + return + } + + t.Lock() + defer t.Unlock() + tag, err := t.decayingDeliveryTag(topic) + if err != nil { + log.Warnf("unable to create decaying delivery tag: %s", err) + return + } + t.decaying[topic] = tag +} + +func (t *tagTracer) removeDeliveryTag(topic string) { + t.Lock() + defer t.Unlock() + delete(t.decaying, topic) +} + +func (t *tagTracer) decayingDeliveryTag(topic string) (connmgr.DecayingTag, error) { + name := fmt.Sprintf("pubsub-deliveries:%s", topic) + + // decrement tag value by GossipSubConnTagDecayAmount at each decay interval + decayFn := func(value connmgr.DecayingValue) (after int, rm bool) { + v := value.Value - GossipSubConnTagDecayAmount + return v, v <= 0 + } + + // bump up to max of GossipSubConnTagMessageDeliveryCap + bumpFn := func(value connmgr.DecayingValue, delta int) (after int) { + val := value.Value + delta + if val > GossipSubConnTagMessageDeliveryCap { + return GossipSubConnTagMessageDeliveryCap + } + return val + } + + return t.decayer.RegisterDecayingTag(name, GossipSubConnTagDecayInterval, decayFn, bumpFn) +} + +func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error { + tag, ok := t.decaying[topic] + if !ok { + return fmt.Errorf("no decaying tag registered for topic %s", topic) + } + return tag.Bump(p, GossipSubConnTagBumpMessageDelivery) +} + +func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { + for _, topic := range msg.TopicIDs { + err := t.bumpDeliveryTag(p, topic) + if err != nil { + log.Warnf("error bumping delivery tag: %s", err) + } + } +} + +// -- internalTracer interface methods +var _ internalTracer = (*tagTracer)(nil) + +func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) { + t.tagPeerIfDirect(p) +} + +func (t *tagTracer) Join(topic string) { + t.addDeliveryTag(topic) +} + +func (t *tagTracer) DeliverMessage(msg *Message) { + // TODO: also give a bump to "near-first" message deliveries + t.bumpTagsForMessage(msg.ReceivedFrom, msg) +} + +func (t *tagTracer) Leave(topic string) { + t.removeDeliveryTag(topic) +} + +func (t *tagTracer) Graft(p peer.ID, topic string) { + t.tagMeshPeer(p, topic) +} + +func (t *tagTracer) Prune(p peer.ID, topic string) { + t.untagMeshPeer(p, topic) +} + +func (t *tagTracer) RemovePeer(peer.ID) {} +func (t *tagTracer) ValidateMessage(*Message) {} +func (t *tagTracer) RejectMessage(*Message, string) {} +func (t *tagTracer) DuplicateMessage(*Message) {}