diff --git a/comm.go b/comm.go index ec67d32..24ad6f9 100644 --- a/comm.go +++ b/comm.go @@ -19,7 +19,18 @@ import ( // get the initial RPC containing all of our subscriptions to send to new peers func (p *PubSub) getHelloPacket() *RPC { var rpc RPC + + subscriptions := make(map[string]bool) + for t := range p.mySubs { + subscriptions[t] = true + } + + for t := range p.myRelays { + subscriptions[t] = true + } + + for t := range subscriptions { as := &pb.RPC_SubOpts{ Topicid: proto.String(t), Subscribe: proto.Bool(true), diff --git a/pubsub.go b/pubsub.go index 600f67c..3ac27fb 100644 --- a/pubsub.go +++ b/pubsub.go @@ -595,7 +595,9 @@ func (p *PubSub) handleRemoveTopic(req *rmTopicReq) { return } - if len(topic.evtHandlers) == 0 && len(p.mySubs[req.topic.topic]) == 0 { + if len(topic.evtHandlers) == 0 && + len(p.mySubs[req.topic.topic]) == 0 && + p.myRelays[req.topic.topic] == 0 { delete(p.myTopics, topic.topic) req.resp <- nil return @@ -621,6 +623,10 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.mySubs, sub.topic) + } + + // stop announcing only if there are no more subs and relays + if len(subs) == 0 && p.myRelays[sub.topic] == 0 { p.disc.StopAdvertise(sub.topic) p.announce(sub.topic, false) p.rt.Leave(sub.topic) @@ -635,8 +641,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { sub := req.sub subs := p.mySubs[sub.topic] - // announce we want this topic - if len(subs) == 0 { + // announce we want this topic if neither subs nor relays exist so far + if len(subs) == 0 && p.myRelays[sub.topic] == 0 { p.disc.Advertise(sub.topic) p.announce(sub.topic, true) p.rt.Join(sub.topic) @@ -659,20 +665,20 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // relays for the topic. // Only called from processLoop. func (p *PubSub) handleAddRelay(req *addRelayReq) { - relays := p.myRelays[req.topic] + topic := req.topic - // announce we want this topic - if relays == 0 { - p.disc.Advertise(req.topic) - p.announce(req.topic, true) - p.rt.Join(req.topic) + // announce we want this topic if neither relays nor subs exist so far + if p.myRelays[topic] == 0 && len(p.mySubs[topic]) == 0 { + p.disc.Advertise(topic) + p.announce(topic, true) + p.rt.Join(topic) } - p.myRelays[req.topic]++ + p.myRelays[topic]++ req.resp <- func() { select { - case p.rmRelay <- req.topic: + case p.rmRelay <- topic: case <-p.ctx.Done(): } } @@ -691,6 +697,10 @@ func (p *PubSub) handleRemoveRelay(topic string) { if p.myRelays[topic] == 0 { delete(p.myRelays, topic) + } + + // stop announcing only if there are no more relays and subs + if p.myRelays[topic] == 0 && len(p.mySubs[topic]) == 0 { p.disc.StopAdvertise(topic) p.announce(topic, false) p.rt.Leave(topic) @@ -722,7 +732,11 @@ func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) retry := func() { - _, ok := p.mySubs[topic] + _, okSubs := p.mySubs[topic] + _, okRelays := p.myRelays[topic] + + ok := okSubs || okRelays + if (ok && sub) || (!ok && !sub) { p.doAnnounceRetry(pid, topic, sub) }