fix announce retry logic

This commit is contained in:
vyzo 2018-10-29 11:10:31 +02:00
parent a426c572ad
commit 575cf339f4

View File

@ -407,18 +407,18 @@ func (p *PubSub) announce(topic string, sub bool) {
case peer <- out: case peer <- out:
default: default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
go p.announceRetry(topic, sub) go p.announceRetry(pid, topic, sub)
} }
} }
} }
func (p *PubSub) announceRetry(topic string, sub bool) { func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
retry := func() { retry := func() {
_, ok := p.myTopics[topic] _, ok := p.myTopics[topic]
if (ok && sub) || (!ok && !sub) { if (ok && sub) || (!ok && !sub) {
p.announce(topic, sub) p.doAnnounceRetry(pid, topic, sub)
} }
} }
@ -428,6 +428,31 @@ func (p *PubSub) announceRetry(topic string, sub bool) {
} }
} }
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
peer, ok := p.peers[pid]
if !ok {
return
}
// don't take pointers to the goroutine stack
topicP := new(string)
subP := new(bool)
*topicP = topic
*subP = sub
subopt := &pb.RPC_SubOpts{
Topicid: topicP,
Subscribe: subP,
}
out := rpcWithSubs(subopt)
select {
case peer <- out:
default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
go p.announceRetry(pid, topic, sub)
}
}
// notifySubs sends a given message to all corresponding subscribers. // notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop. // Only called from processLoop.
func (p *PubSub) notifySubs(msg *pb.Message) { func (p *PubSub) notifySubs(msg *pb.Message) {