diff --git a/pubsub.go b/pubsub.go index 50bb2a1..ce57237 100644 --- a/pubsub.go +++ b/pubsub.go @@ -676,12 +676,22 @@ func (p *PubSub) handleAddRelay(req *addRelayReq) { p.rt.Join(topic) } - req.resp <- func() { + // flag used to prevent calling cancel function multiple times + isCancelled := false + + relayCancelFunc := func() { + if isCancelled { + return + } + select { case p.rmRelay <- topic: + isCancelled = true case <-p.ctx.Done(): } } + + req.resp <- relayCancelFunc } // handleRemoveRelay removes one relay reference from bookkeeping. diff --git a/topic_test.go b/topic_test.go index a3f2e7a..407998c 100644 --- a/topic_test.go +++ b/topic_test.go @@ -629,7 +629,20 @@ func TestTopicRelayReuse(t *testing.T) { t.Fatal("incorrect number of relays") } + // only the first invocation should take effect relay1Cancel() + relay1Cancel() + relay1Cancel() + + pubsubs[0].eval <- func() { + res <- pubsubs[0].myRelays[topic] == 2 + } + + isCorrectNumber = <-res + if !isCorrectNumber { + t.Fatal("incorrect number of relays") + } + relay2Cancel() relay3Cancel()