Topic relay support

Implemented Relay method which
allows to enable message relaying
for the given topic.
This commit is contained in:
Lukasz Zimnoch 2020-04-29 13:05:31 +02:00 committed by vyzo
parent 27b987071d
commit 750cc66336
2 changed files with 60 additions and 0 deletions

View File

@ -67,6 +67,9 @@ type PubSub struct {
// addSub is a control channel for us to add and remove subscriptions // addSub is a control channel for us to add and remove subscriptions
addSub chan *addSubReq addSub chan *addSubReq
// addRelay is a control channel for us to add and remove relays
addRelay chan *addRelayReq
// get list of topics we are subscribed to // get list of topics we are subscribed to
getTopics chan *topicReq getTopics chan *topicReq
@ -97,6 +100,9 @@ type PubSub struct {
// The set of topics we are subscribed to // The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{} mySubs map[string]map[*Subscription]struct{}
// The set of topics we are relaying for
myRelays map[string]int
// The set of topics we are interested in // The set of topics we are interested in
myTopics map[string]*Topic myTopics map[string]*Topic
@ -210,6 +216,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
cancelCh: make(chan *Subscription), cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq), getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq), addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
addTopic: make(chan *addTopicReq), addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq), rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq), getTopics: make(chan *topicReq),
@ -219,6 +226,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
eval: make(chan func()), eval: make(chan func()),
myTopics: make(map[string]*Topic), myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}), mySubs: make(map[string]map[*Subscription]struct{}),
myRelays: make(map[string]int),
topics: make(map[string]map[peer.ID]struct{}), topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC), peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(), blacklist: NewMapBlacklist(),
@ -493,6 +501,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleRemoveSubscription(sub) p.handleRemoveSubscription(sub)
case sub := <-p.addSub: case sub := <-p.addSub:
p.handleAddSubscription(sub) p.handleAddSubscription(sub)
case relay := <-p.addRelay:
p.handleAddRelay(relay)
case preq := <-p.getPeers: case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic] tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok { if preq.topic != "" && !ok {
@ -638,6 +648,25 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
req.resp <- sub req.resp <- sub
} }
// handleAddRelay adds a relay for a particular topic. If it is
// the first relay for the topic, it will announce that this node
// relays for the topic.
// Only called from processLoop.
func (p *PubSub) handleAddRelay(req *addRelayReq) {
relays := p.myRelays[req.topic]
// announce we want this topic
if relays == 0 {
p.disc.Advertise(req.topic)
p.announce(req.topic, true)
p.rt.Join(req.topic)
}
p.myRelays[req.topic]++
req.resp <- nil
}
// announce announces whether or not this node is interested in a given topic // announce announces whether or not this node is interested in a given topic
// Only called from processLoop. // Only called from processLoop.
func (p *PubSub) announce(topic string, sub bool) { func (p *PubSub) announce(topic string, sub bool) {
@ -1070,3 +1099,8 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error {
} }
return <-rmVal.resp return <-rmVal.resp
} }
type addRelayReq struct {
topic string
resp chan error
}

View File

@ -121,6 +121,32 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
return <-out, nil return <-out, nil
} }
// Relay enables message relaying for the topic. Subsequent calls increase
// the reference counter. To completely disable the relay, all references
// must be revoked.
func (t *Topic) Relay() error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return ErrTopicClosed
}
out := make(chan error, 1)
t.p.disc.Discover(t.topic)
select {
case t.p.addRelay <- &addRelayReq{
topic: t.topic,
resp: out,
}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
return <-out
}
// RouterReady is a function that decides if a router is ready to publish // RouterReady is a function that decides if a router is ready to publish
type RouterReady func(rt PubSubRouter, topic string) (bool, error) type RouterReady func(rt PubSubRouter, topic string) (bool, error)