Merge pull request #125 from libp2p/fix/announce-retry

fix announce retry logic
This commit is contained in:
Steven Allen 2018-10-29 06:30:45 -07:00 committed by GitHub
commit ec18566a0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -407,18 +407,18 @@ func (p *PubSub) announce(topic string, sub bool) {
case peer <- out:
default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
go p.announceRetry(topic, sub)
go p.announceRetry(pid, topic, sub)
}
}
}
func (p *PubSub) announceRetry(topic string, sub bool) {
func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
retry := func() {
_, ok := p.myTopics[topic]
if (ok && sub) || (!ok && !sub) {
p.announce(topic, sub)
p.doAnnounceRetry(pid, topic, sub)
}
}
@ -428,6 +428,26 @@ func (p *PubSub) announceRetry(topic string, sub bool) {
}
}
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
peer, ok := p.peers[pid]
if !ok {
return
}
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
}
out := rpcWithSubs(subopt)
select {
case peer <- out:
default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
go p.announceRetry(pid, topic, sub)
}
}
// notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop.
func (p *PubSub) notifySubs(msg *pb.Message) {