diff --git a/gossipsub.go b/gossipsub.go index a9accd1..cbe6de6 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -328,6 +328,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { // and the gossip tracing gs.gossipTracer.Start(gs) + // and the tracer for connmgr tags + gs.tagTracer.Start(gs) + // start using the same msg ID function as PubSub for caching messages. gs.mcache.SetMsgIdFn(p.msgID) diff --git a/tag_tracer.go b/tag_tracer.go index fe81e72..2f4804e 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -1,6 +1,7 @@ package pubsub import ( + "context" "fmt" "sync" "time" @@ -55,9 +56,14 @@ type tagTracer struct { sync.RWMutex cmgr connmgr.ConnManager + msgID MsgIdFunction decayer connmgr.Decayer decaying map[string]connmgr.DecayingTag direct map[peer.ID]struct{} + + // track message deliveries to reward "near first" deliveries + // (a delivery that occurs while we're still validating the message) + deliveries *messageDeliveries } func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { @@ -66,12 +72,44 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied") } return &tagTracer{ - cmgr: cmgr, - decayer: decayer, - decaying: make(map[string]connmgr.DecayingTag), + cmgr: cmgr, + msgID: DefaultMsgIdFn, + decayer: decayer, + decaying: make(map[string]connmgr.DecayingTag), + deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)}, } } +func (t *tagTracer) Start(gs *GossipSubRouter) { + if t == nil { + return + } + + t.msgID = gs.p.msgID + t.direct = gs.direct + go t.background(gs.p.ctx) +} + +func (t *tagTracer) background(ctx context.Context) { + gcDeliveryRecords := time.NewTicker(time.Minute) + defer gcDeliveryRecords.Stop() + + for { + select { + case <-gcDeliveryRecords.C: + t.gcDeliveryRecords() + case <-ctx.Done(): + return + } + } +} + +func (t *tagTracer) gcDeliveryRecords() { + t.Lock() + defer t.Unlock() + t.deliveries.gc() +} + func (t *tagTracer) tagPeerIfDirect(p peer.ID) { if t.direct == nil { return @@ -163,6 +201,31 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { } } +// nearFirstPeers returns the peers who delivered the message while it was still validating +func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { + t.Lock() + defer t.Unlock() + drec := t.deliveries.getRecord(t.msgID(msg.Message)) + nearFirstPeers := make([]peer.ID, 0, len(drec.peers)) + // defensive check that this is the first delivery trace -- delivery status should be unknown + if drec.status != deliveryUnknown { + log.Warnf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status) + return nearFirstPeers + } + + drec.status = deliveryValid + drec.validated = time.Now() + + for p := range drec.peers { + // this check is to make sure a peer can't send us a message twice and get a double count + // if it is a first delivery. + if p != msg.ReceivedFrom { + nearFirstPeers = append(nearFirstPeers, p) + } + } + return nearFirstPeers +} + // -- internalTracer interface methods var _ internalTracer = (*tagTracer)(nil) @@ -175,8 +238,12 @@ func (t *tagTracer) Join(topic string) { } func (t *tagTracer) DeliverMessage(msg *Message) { - // TODO: also give a bump to "near-first" message deliveries + nearFirst := t.nearFirstPeers(msg) + t.bumpTagsForMessage(msg.ReceivedFrom, msg) + for _, p := range nearFirst { + t.bumpTagsForMessage(p, msg) + } } func (t *tagTracer) Leave(topic string) { @@ -191,7 +258,41 @@ func (t *tagTracer) Prune(p peer.ID, topic string) { t.untagMeshPeer(p, topic) } -func (t *tagTracer) RemovePeer(peer.ID) {} -func (t *tagTracer) ValidateMessage(*Message) {} -func (t *tagTracer) RejectMessage(*Message, string) {} -func (t *tagTracer) DuplicateMessage(*Message) {} +func (t *tagTracer) ValidateMessage(msg *Message) { + t.Lock() + defer t.Unlock() + + // create a delivery record for the message + _ = t.deliveries.getRecord(t.msgID(msg.Message)) +} + +func (t *tagTracer) DuplicateMessage(msg *Message) { + t.Lock() + defer t.Unlock() + + drec := t.deliveries.getRecord(t.msgID(msg.Message)) + if drec.status == deliveryUnknown { + // the message is being validated; track the peer delivery and wait for + // the Deliver/Reject notification. + drec.peers[msg.ReceivedFrom] = struct{}{} + } +} + +func (t *tagTracer) RejectMessage(msg *Message, reason string) { + t.Lock() + defer t.Unlock() + + // mark message as invalid and release tracking info + drec := t.deliveries.getRecord(t.msgID(msg.Message)) + + // defensive check that this is the first rejection trace -- delivery status should be unknown + if drec.status != deliveryUnknown { + log.Warnf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status) + return + } + + drec.status = deliveryInvalid + drec.peers = nil +} + +func (t *tagTracer) RemovePeer(peer.ID) {} diff --git a/tag_tracer_test.go b/tag_tracer_test.go index e27f5a5..23c4840 100644 --- a/tag_tracer_test.go +++ b/tag_tracer_test.go @@ -1,6 +1,7 @@ package pubsub import ( + "fmt" "github.com/benbjohnson/clock" connmgr "github.com/libp2p/go-libp2p-connmgr" connmgri "github.com/libp2p/go-libp2p-core/connmgr" @@ -139,6 +140,76 @@ func TestTagTracerDeliveryTags(t *testing.T) { } } +func TestTagTracerDeliveryTagsNearFirst(t *testing.T) { + // use fake time to test the tag decay + clk := clock.NewMock() + decayCfg := &connmgr.DecayerCfg{ + Clock: clk, + Resolution: time.Minute, + } + cmgr := connmgr.NewConnManager(5, 10, time.Minute, connmgr.DecayerConfig(decayCfg)) + + tt := newTagTracer(cmgr) + + topic := "test" + + p := peer.ID("a-peer") + p2 := peer.ID("another-peer") + p3 := peer.ID("slow-peer") + + tt.Join(topic) + + for i := 0; i < GossipSubConnTagMessageDeliveryCap+5; i++ { + topics := []string{topic} + msg := &Message{ + ReceivedFrom: p, + Message: &pb.Message{ + From: []byte(p), + Data: []byte(fmt.Sprintf("msg-%d", i)), + TopicIDs: topics, + Seqno: []byte(fmt.Sprintf("%d", i)), + }, + } + + // a duplicate of the message, received from p2 + dup := &Message{ + ReceivedFrom: p2, + Message: msg.Message, + } + + // the message starts validating as soon as we receive it from p + tt.ValidateMessage(msg) + // p2 should get near-first credit for the duplicate message that arrives before + // validation is complete + tt.DuplicateMessage(dup) + // DeliverMessage gets called when validation is complete + tt.DeliverMessage(msg) + + // p3 delivers a duplicate after validation completes & gets no credit + dup.ReceivedFrom = p3 + tt.DuplicateMessage(dup) + } + + clk.Add(time.Minute) + + // both p and p2 should get delivery tags equal to the cap + tag := "pubsub-deliveries:test" + val := getTagValue(cmgr, p, tag) + if val != GossipSubConnTagMessageDeliveryCap { + t.Errorf("expected tag %s to have val %d, was %d", tag, GossipSubConnTagMessageDeliveryCap, val) + } + val = getTagValue(cmgr, p2, tag) + if val != GossipSubConnTagMessageDeliveryCap { + t.Errorf("expected tag %s for near-first peer to have val %d, was %d", tag, GossipSubConnTagMessageDeliveryCap, val) + } + + // p3 should have no delivery tag credit + val = getTagValue(cmgr, p3, tag) + if val != 0 { + t.Errorf("expected tag %s for slow peer to have val %d, was %d", tag, 0, val) + } +} + func getTagValue(mgr connmgri.ConnManager, p peer.ID, tag string) int { info := mgr.GetTagInfo(p) if info == nil {