From 575cf339f4304c1d58614ba8e98daa2990304be3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 29 Oct 2018 11:10:31 +0200 Subject: [PATCH 1/2] fix announce retry logic --- pubsub.go | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index 6dbf0a0..2e48c5b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -407,18 +407,18 @@ func (p *PubSub) announce(topic string, sub bool) { case peer <- out: default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) - go p.announceRetry(topic, sub) + go p.announceRetry(pid, topic, sub) } } } -func (p *PubSub) announceRetry(topic string, sub bool) { +func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) retry := func() { _, ok := p.myTopics[topic] if (ok && sub) || (!ok && !sub) { - p.announce(topic, sub) + p.doAnnounceRetry(pid, topic, sub) } } @@ -428,6 +428,31 @@ func (p *PubSub) announceRetry(topic string, sub bool) { } } +func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { + peer, ok := p.peers[pid] + if !ok { + return + } + + // don't take pointers to the goroutine stack + topicP := new(string) + subP := new(bool) + *topicP = topic + *subP = sub + subopt := &pb.RPC_SubOpts{ + Topicid: topicP, + Subscribe: subP, + } + + out := rpcWithSubs(subopt) + select { + case peer <- out: + default: + log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + go p.announceRetry(pid, topic, sub) + } +} + // notifySubs sends a given message to all corresponding subscribers. // Only called from processLoop. func (p *PubSub) notifySubs(msg *pb.Message) { From 86e65e589d469bcdf87c84e6b5d47744cfbc8d2d Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 29 Oct 2018 12:38:03 +0200 Subject: [PATCH 2/2] don't stand on your head to take a pointer the whole stack argument doesn't make much sense in golang; escape analysis should allocate in heap. --- pubsub.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pubsub.go b/pubsub.go index 2e48c5b..d90249f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -434,14 +434,9 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { return } - // don't take pointers to the goroutine stack - topicP := new(string) - subP := new(bool) - *topicP = topic - *subP = sub subopt := &pb.RPC_SubOpts{ - Topicid: topicP, - Subscribe: subP, + Topicid: &topic, + Subscribe: &sub, } out := rpcWithSubs(subopt)