diff --git a/tag_tracer.go b/tag_tracer.go index 021b6e8..6b57a4f 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -132,6 +132,14 @@ func (t *tagTracer) addDeliveryTag(topic string) { func (t *tagTracer) removeDeliveryTag(topic string) { t.Lock() defer t.Unlock() + tag, ok := t.decaying[topic] + if !ok { + return + } + err := tag.Close() + if err != nil { + log.Warnf("error closing decaying connmgr tag: %s", err) + } delete(t.decaying, topic) } diff --git a/tag_tracer_test.go b/tag_tracer_test.go index b664945..a300974 100644 --- a/tag_tracer_test.go +++ b/tag_tracer_test.go @@ -111,15 +111,18 @@ func TestTagTracerDeliveryTags(t *testing.T) { // we have to tick the fake clock once to apply the bump clk.Add(time.Minute) + tag1 := "pubsub-deliveries:topic-1" + tag2 := "pubsub-deliveries:topic-2" + // the tag value for topic-1 should be capped at GossipSubConnTagMessageDeliveryCap (default 15) - val := getTagValue(cmgr, p, "pubsub-deliveries:topic-1") + val := getTagValue(cmgr, p, tag1) expected := GossipSubConnTagMessageDeliveryCap if val != expected { t.Errorf("expected delivery tag to be capped at %d, was %d", expected, val) } // the value for topic-2 should equal the number of messages delivered (5), since it was less than the cap - val = getTagValue(cmgr, p, "pubsub-deliveries:topic-2") + val = getTagValue(cmgr, p, tag2) expected = 5 if val != expected { t.Errorf("expected delivery tag value = %d, got %d", expected, val) @@ -128,18 +131,29 @@ func TestTagTracerDeliveryTags(t *testing.T) { // if we jump forward a few minutes, we should see the tags decrease by 1 / 10 minutes clk.Add(50 * time.Minute) - val = getTagValue(cmgr, p, "pubsub-deliveries:topic-1") + val = getTagValue(cmgr, p, tag1) expected = GossipSubConnTagMessageDeliveryCap - 5 if val != expected { t.Errorf("expected delivery tag value = %d, got %d", expected, val) } // the tag for topic-2 should have reset to zero by now - val = getTagValue(cmgr, p, "pubsub-deliveries:topic-2") + val = getTagValue(cmgr, p, tag2) expected = 0 if val != expected { t.Errorf("expected delivery tag value = %d, got %d", expected, val) } + + // leaving the topic should remove the tag + if !tagExists(cmgr, p, tag1) { + t.Errorf("expected delivery tag %s to be applied to peer %s", tag1, p) + } + tt.Leave(topic1) + // advance the real clock a bit to allow the connmgr to remove the tag async + time.Sleep(500 * time.Millisecond) + if tagExists(cmgr, p, tag1) { + t.Errorf("expected delivery tag %s to be removed after leaving the topic", tag1) + } } func TestTagTracerDeliveryTagsNearFirst(t *testing.T) { @@ -223,3 +237,12 @@ func getTagValue(mgr connmgri.ConnManager, p peer.ID, tag string) int { } return val } + +func tagExists(mgr connmgri.ConnManager, p peer.ID, tag string) bool { + info := mgr.GetTagInfo(p) + if info == nil { + return false + } + _, exists := info.Tags[tag] + return exists +}