Topic relay discovery
Incorporated relays to exiting discovery logic and added them to topic before-remove guard.
This commit is contained in:
parent
af44f7a07d
commit
0d884f3e90
11
comm.go
11
comm.go
|
@ -19,7 +19,18 @@ import (
|
|||
// get the initial RPC containing all of our subscriptions to send to new peers
|
||||
func (p *PubSub) getHelloPacket() *RPC {
|
||||
var rpc RPC
|
||||
|
||||
subscriptions := make(map[string]bool)
|
||||
|
||||
for t := range p.mySubs {
|
||||
subscriptions[t] = true
|
||||
}
|
||||
|
||||
for t := range p.myRelays {
|
||||
subscriptions[t] = true
|
||||
}
|
||||
|
||||
for t := range subscriptions {
|
||||
as := &pb.RPC_SubOpts{
|
||||
Topicid: proto.String(t),
|
||||
Subscribe: proto.Bool(true),
|
||||
|
|
38
pubsub.go
38
pubsub.go
|
@ -595,7 +595,9 @@ func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
|
|||
return
|
||||
}
|
||||
|
||||
if len(topic.evtHandlers) == 0 && len(p.mySubs[req.topic.topic]) == 0 {
|
||||
if len(topic.evtHandlers) == 0 &&
|
||||
len(p.mySubs[req.topic.topic]) == 0 &&
|
||||
p.myRelays[req.topic.topic] == 0 {
|
||||
delete(p.myTopics, topic.topic)
|
||||
req.resp <- nil
|
||||
return
|
||||
|
@ -621,6 +623,10 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
|||
|
||||
if len(subs) == 0 {
|
||||
delete(p.mySubs, sub.topic)
|
||||
}
|
||||
|
||||
// stop announcing only if there are no more subs and relays
|
||||
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
|
||||
p.disc.StopAdvertise(sub.topic)
|
||||
p.announce(sub.topic, false)
|
||||
p.rt.Leave(sub.topic)
|
||||
|
@ -635,8 +641,8 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
|||
sub := req.sub
|
||||
subs := p.mySubs[sub.topic]
|
||||
|
||||
// announce we want this topic
|
||||
if len(subs) == 0 {
|
||||
// announce we want this topic if neither subs nor relays exist so far
|
||||
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
|
||||
p.disc.Advertise(sub.topic)
|
||||
p.announce(sub.topic, true)
|
||||
p.rt.Join(sub.topic)
|
||||
|
@ -659,20 +665,20 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
|||
// relays for the topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddRelay(req *addRelayReq) {
|
||||
relays := p.myRelays[req.topic]
|
||||
topic := req.topic
|
||||
|
||||
// announce we want this topic
|
||||
if relays == 0 {
|
||||
p.disc.Advertise(req.topic)
|
||||
p.announce(req.topic, true)
|
||||
p.rt.Join(req.topic)
|
||||
// announce we want this topic if neither relays nor subs exist so far
|
||||
if p.myRelays[topic] == 0 && len(p.mySubs[topic]) == 0 {
|
||||
p.disc.Advertise(topic)
|
||||
p.announce(topic, true)
|
||||
p.rt.Join(topic)
|
||||
}
|
||||
|
||||
p.myRelays[req.topic]++
|
||||
p.myRelays[topic]++
|
||||
|
||||
req.resp <- func() {
|
||||
select {
|
||||
case p.rmRelay <- req.topic:
|
||||
case p.rmRelay <- topic:
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
@ -691,6 +697,10 @@ func (p *PubSub) handleRemoveRelay(topic string) {
|
|||
|
||||
if p.myRelays[topic] == 0 {
|
||||
delete(p.myRelays, topic)
|
||||
}
|
||||
|
||||
// stop announcing only if there are no more relays and subs
|
||||
if p.myRelays[topic] == 0 && len(p.mySubs[topic]) == 0 {
|
||||
p.disc.StopAdvertise(topic)
|
||||
p.announce(topic, false)
|
||||
p.rt.Leave(topic)
|
||||
|
@ -722,7 +732,11 @@ func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
|
|||
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
|
||||
|
||||
retry := func() {
|
||||
_, ok := p.mySubs[topic]
|
||||
_, okSubs := p.mySubs[topic]
|
||||
_, okRelays := p.myRelays[topic]
|
||||
|
||||
ok := okSubs || okRelays
|
||||
|
||||
if (ok && sub) || (!ok && !sub) {
|
||||
p.doAnnounceRetry(pid, topic, sub)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue