diff --git a/pubsub.go b/pubsub.go index 6dbf0a0..d90249f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -407,18 +407,18 @@ func (p *PubSub) announce(topic string, sub bool) { case peer <- out: default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) - go p.announceRetry(topic, sub) + go p.announceRetry(pid, topic, sub) } } } -func (p *PubSub) announceRetry(topic string, sub bool) { +func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) retry := func() { _, ok := p.myTopics[topic] if (ok && sub) || (!ok && !sub) { - p.announce(topic, sub) + p.doAnnounceRetry(pid, topic, sub) } } @@ -428,6 +428,26 @@ func (p *PubSub) announceRetry(topic string, sub bool) { } } +func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { + peer, ok := p.peers[pid] + if !ok { + return + } + + subopt := &pb.RPC_SubOpts{ + Topicid: &topic, + Subscribe: &sub, + } + + out := rpcWithSubs(subopt) + select { + case peer <- out: + default: + log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + go p.announceRetry(pid, topic, sub) + } +} + // notifySubs sends a given message to all corresponding subscribers. // Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) {