Topic relay cancelling

Implemented relay reference
cancel logic.
This commit is contained in:
Lukasz Zimnoch 2020-04-29 13:33:09 +02:00 committed by vyzo
parent 750cc66336
commit af44f7a07d
2 changed files with 42 additions and 10 deletions

View File

@ -70,6 +70,9 @@ type PubSub struct {
// addRelay is a control channel for us to add and remove relays // addRelay is a control channel for us to add and remove relays
addRelay chan *addRelayReq addRelay chan *addRelayReq
// rmRelay is a relay cancellation channel
rmRelay chan string
// get list of topics we are subscribed to // get list of topics we are subscribed to
getTopics chan *topicReq getTopics chan *topicReq
@ -217,6 +220,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
getPeers: make(chan *listPeerReq), getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq), addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq), addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
addTopic: make(chan *addTopicReq), addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq), rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq), getTopics: make(chan *topicReq),
@ -503,6 +507,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleAddSubscription(sub) p.handleAddSubscription(sub)
case relay := <-p.addRelay: case relay := <-p.addRelay:
p.handleAddRelay(relay) p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case preq := <-p.getPeers: case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic] tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok { if preq.topic != "" && !ok {
@ -664,7 +670,31 @@ func (p *PubSub) handleAddRelay(req *addRelayReq) {
p.myRelays[req.topic]++ p.myRelays[req.topic]++
req.resp <- nil req.resp <- func() {
select {
case p.rmRelay <- req.topic:
case <-p.ctx.Done():
}
}
}
// handleRemoveRelay removes one relay reference from bookkeeping.
// If this was the last relay reference for a given topic, it will also
// announce that this node is not relaying for this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveRelay(topic string) {
if p.myRelays[topic] == 0 {
return
}
p.myRelays[topic]--
if p.myRelays[topic] == 0 {
delete(p.myRelays, topic)
p.disc.StopAdvertise(topic)
p.announce(topic, false)
p.rt.Leave(topic)
}
} }
// announce announces whether or not this node is interested in a given topic // announce announces whether or not this node is interested in a given topic
@ -1100,7 +1130,9 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error {
return <-rmVal.resp return <-rmVal.resp
} }
type RelayCancelFunc func()
type addRelayReq struct { type addRelayReq struct {
topic string topic string
resp chan error resp chan RelayCancelFunc
} }

View File

@ -121,17 +121,17 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
return <-out, nil return <-out, nil
} }
// Relay enables message relaying for the topic. Subsequent calls increase // Relay enables message relaying for the topic and returns a reference
// the reference counter. To completely disable the relay, all references // cancel function. Subsequent calls increase the reference counter.
// must be revoked. // To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() error { func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock() t.mux.RLock()
defer t.mux.RUnlock() defer t.mux.RUnlock()
if t.closed { if t.closed {
return ErrTopicClosed return nil, ErrTopicClosed
} }
out := make(chan error, 1) out := make(chan RelayCancelFunc, 1)
t.p.disc.Discover(t.topic) t.p.disc.Discover(t.topic)
@ -141,10 +141,10 @@ func (t *Topic) Relay() error {
resp: out, resp: out,
}: }:
case <-t.p.ctx.Done(): case <-t.p.ctx.Done():
return t.p.ctx.Err() return nil, t.p.ctx.Err()
} }
return <-out return <-out, nil
} }
// RouterReady is a function that decides if a router is ready to publish // RouterReady is a function that decides if a router is ready to publish