diff --git a/pubsub.go b/pubsub.go index 3d0b3e8..f739c3a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "fmt" + "math/rand" "sync/atomic" "time" @@ -330,9 +331,18 @@ func (p *PubSub) announce(topic string, sub bool) { select { case peer <- out: default: - // TODO this needs to be reliable, schedule it for piggybacking - // in a subsequent message or retry later - log.Infof("dropping announce message to peer %s: queue full", pid) + log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + go p.announceRetry(topic, sub) + } + } +} + +func (p *PubSub) announceRetry(topic string, sub bool) { + time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) + p.eval <- func() { + _, ok := p.myTopics[topic] + if (ok && sub) || (!ok && !sub) { + p.announce(topic, sub) } } }