From af44f7a07d24a867e63f92b849a2b232a28f64a5 Mon Sep 17 00:00:00 2001 From: Lukasz Zimnoch Date: Wed, 29 Apr 2020 13:33:09 +0200 Subject: [PATCH] Topic relay cancelling Implemented relay reference cancel logic. --- pubsub.go | 36 ++++++++++++++++++++++++++++++++++-- topic.go | 16 ++++++++-------- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pubsub.go b/pubsub.go index 6f63b87..600f67c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -70,6 +70,9 @@ type PubSub struct { // addRelay is a control channel for us to add and remove relays addRelay chan *addRelayReq + // rmRelay is a relay cancellation channel + rmRelay chan string + // get list of topics we are subscribed to getTopics chan *topicReq @@ -217,6 +220,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), addRelay: make(chan *addRelayReq), + rmRelay: make(chan string), addTopic: make(chan *addTopicReq), rmTopic: make(chan *rmTopicReq), getTopics: make(chan *topicReq), @@ -503,6 +507,8 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleAddSubscription(sub) case relay := <-p.addRelay: p.handleAddRelay(relay) + case topic := <-p.rmRelay: + p.handleRemoveRelay(topic) case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -664,7 +670,31 @@ func (p *PubSub) handleAddRelay(req *addRelayReq) { p.myRelays[req.topic]++ - req.resp <- nil + req.resp <- func() { + select { + case p.rmRelay <- req.topic: + case <-p.ctx.Done(): + } + } +} + +// handleRemoveRelay removes one relay reference from bookkeeping. +// If this was the last relay reference for a given topic, it will also +// announce that this node is not relaying for this topic anymore. +// Only called from processLoop. +func (p *PubSub) handleRemoveRelay(topic string) { + if p.myRelays[topic] == 0 { + return + } + + p.myRelays[topic]-- + + if p.myRelays[topic] == 0 { + delete(p.myRelays, topic) + p.disc.StopAdvertise(topic) + p.announce(topic, false) + p.rt.Leave(topic) + } } // announce announces whether or not this node is interested in a given topic @@ -1100,7 +1130,9 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error { return <-rmVal.resp } +type RelayCancelFunc func() + type addRelayReq struct { topic string - resp chan error + resp chan RelayCancelFunc } diff --git a/topic.go b/topic.go index e684340..dcfb062 100644 --- a/topic.go +++ b/topic.go @@ -121,17 +121,17 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { return <-out, nil } -// Relay enables message relaying for the topic. Subsequent calls increase -// the reference counter. To completely disable the relay, all references -// must be revoked. -func (t *Topic) Relay() error { +// Relay enables message relaying for the topic and returns a reference +// cancel function. Subsequent calls increase the reference counter. +// To completely disable the relay, all references must be cancelled. +func (t *Topic) Relay() (RelayCancelFunc, error) { t.mux.RLock() defer t.mux.RUnlock() if t.closed { - return ErrTopicClosed + return nil, ErrTopicClosed } - out := make(chan error, 1) + out := make(chan RelayCancelFunc, 1) t.p.disc.Discover(t.topic) @@ -141,10 +141,10 @@ func (t *Topic) Relay() error { resp: out, }: case <-t.p.ctx.Done(): - return t.p.ctx.Err() + return nil, t.p.ctx.Err() } - return <-out + return <-out, nil } // RouterReady is a function that decides if a router is ready to publish