mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
close decaying tags when leaving topic
This commit is contained in:
parent
cd4f0a74a3
commit
b20819d046
@ -132,6 +132,14 @@ func (t *tagTracer) addDeliveryTag(topic string) {
|
||||
func (t *tagTracer) removeDeliveryTag(topic string) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
tag, ok := t.decaying[topic]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
err := tag.Close()
|
||||
if err != nil {
|
||||
log.Warnf("error closing decaying connmgr tag: %s", err)
|
||||
}
|
||||
delete(t.decaying, topic)
|
||||
}
|
||||
|
||||
|
||||
@ -111,15 +111,18 @@ func TestTagTracerDeliveryTags(t *testing.T) {
|
||||
// we have to tick the fake clock once to apply the bump
|
||||
clk.Add(time.Minute)
|
||||
|
||||
tag1 := "pubsub-deliveries:topic-1"
|
||||
tag2 := "pubsub-deliveries:topic-2"
|
||||
|
||||
// the tag value for topic-1 should be capped at GossipSubConnTagMessageDeliveryCap (default 15)
|
||||
val := getTagValue(cmgr, p, "pubsub-deliveries:topic-1")
|
||||
val := getTagValue(cmgr, p, tag1)
|
||||
expected := GossipSubConnTagMessageDeliveryCap
|
||||
if val != expected {
|
||||
t.Errorf("expected delivery tag to be capped at %d, was %d", expected, val)
|
||||
}
|
||||
|
||||
// the value for topic-2 should equal the number of messages delivered (5), since it was less than the cap
|
||||
val = getTagValue(cmgr, p, "pubsub-deliveries:topic-2")
|
||||
val = getTagValue(cmgr, p, tag2)
|
||||
expected = 5
|
||||
if val != expected {
|
||||
t.Errorf("expected delivery tag value = %d, got %d", expected, val)
|
||||
@ -128,18 +131,29 @@ func TestTagTracerDeliveryTags(t *testing.T) {
|
||||
// if we jump forward a few minutes, we should see the tags decrease by 1 / 10 minutes
|
||||
clk.Add(50 * time.Minute)
|
||||
|
||||
val = getTagValue(cmgr, p, "pubsub-deliveries:topic-1")
|
||||
val = getTagValue(cmgr, p, tag1)
|
||||
expected = GossipSubConnTagMessageDeliveryCap - 5
|
||||
if val != expected {
|
||||
t.Errorf("expected delivery tag value = %d, got %d", expected, val)
|
||||
}
|
||||
|
||||
// the tag for topic-2 should have reset to zero by now
|
||||
val = getTagValue(cmgr, p, "pubsub-deliveries:topic-2")
|
||||
val = getTagValue(cmgr, p, tag2)
|
||||
expected = 0
|
||||
if val != expected {
|
||||
t.Errorf("expected delivery tag value = %d, got %d", expected, val)
|
||||
}
|
||||
|
||||
// leaving the topic should remove the tag
|
||||
if !tagExists(cmgr, p, tag1) {
|
||||
t.Errorf("expected delivery tag %s to be applied to peer %s", tag1, p)
|
||||
}
|
||||
tt.Leave(topic1)
|
||||
// advance the real clock a bit to allow the connmgr to remove the tag async
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if tagExists(cmgr, p, tag1) {
|
||||
t.Errorf("expected delivery tag %s to be removed after leaving the topic", tag1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTagTracerDeliveryTagsNearFirst(t *testing.T) {
|
||||
@ -223,3 +237,12 @@ func getTagValue(mgr connmgri.ConnManager, p peer.ID, tag string) int {
|
||||
}
|
||||
return val
|
||||
}
|
||||
|
||||
func tagExists(mgr connmgri.ConnManager, p peer.ID, tag string) bool {
|
||||
info := mgr.GetTagInfo(p)
|
||||
if info == nil {
|
||||
return false
|
||||
}
|
||||
_, exists := info.Tags[tag]
|
||||
return exists
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user