diff --git a/pubsub.go b/pubsub.go index 8efa9dc..6f63b87 100644 --- a/pubsub.go +++ b/pubsub.go @@ -67,6 +67,9 @@ type PubSub struct { // addSub is a control channel for us to add and remove subscriptions addSub chan *addSubReq + // addRelay is a control channel for us to add and remove relays + addRelay chan *addRelayReq + // get list of topics we are subscribed to getTopics chan *topicReq @@ -97,6 +100,9 @@ type PubSub struct { // The set of topics we are subscribed to mySubs map[string]map[*Subscription]struct{} + // The set of topics we are relaying for + myRelays map[string]int + // The set of topics we are interested in myTopics map[string]*Topic @@ -210,6 +216,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), + addRelay: make(chan *addRelayReq), addTopic: make(chan *addTopicReq), rmTopic: make(chan *rmTopicReq), getTopics: make(chan *topicReq), @@ -219,6 +226,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option eval: make(chan func()), myTopics: make(map[string]*Topic), mySubs: make(map[string]map[*Subscription]struct{}), + myRelays: make(map[string]int), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), blacklist: NewMapBlacklist(), @@ -493,6 +501,8 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleRemoveSubscription(sub) case sub := <-p.addSub: p.handleAddSubscription(sub) + case relay := <-p.addRelay: + p.handleAddRelay(relay) case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -638,6 +648,25 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { req.resp <- sub } +// handleAddRelay adds a relay for a particular topic. If it is +// the first relay for the topic, it will announce that this node +// relays for the topic. +// Only called from processLoop. +func (p *PubSub) handleAddRelay(req *addRelayReq) { + relays := p.myRelays[req.topic] + + // announce we want this topic + if relays == 0 { + p.disc.Advertise(req.topic) + p.announce(req.topic, true) + p.rt.Join(req.topic) + } + + p.myRelays[req.topic]++ + + req.resp <- nil +} + // announce announces whether or not this node is interested in a given topic // Only called from processLoop. func (p *PubSub) announce(topic string, sub bool) { @@ -1070,3 +1099,8 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error { } return <-rmVal.resp } + +type addRelayReq struct { + topic string + resp chan error +} diff --git a/topic.go b/topic.go index 1b57c86..e684340 100644 --- a/topic.go +++ b/topic.go @@ -121,6 +121,32 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { return <-out, nil } +// Relay enables message relaying for the topic. Subsequent calls increase +// the reference counter. To completely disable the relay, all references +// must be revoked. +func (t *Topic) Relay() error { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return ErrTopicClosed + } + + out := make(chan error, 1) + + t.p.disc.Discover(t.topic) + + select { + case t.p.addRelay <- &addRelayReq{ + topic: t.topic, + resp: out, + }: + case <-t.p.ctx.Done(): + return t.p.ctx.Err() + } + + return <-out +} + // RouterReady is a function that decides if a router is ready to publish type RouterReady func(rt PubSubRouter, topic string) (bool, error)