add way to query subscribed topics
This commit is contained in:
parent
d733293dbc
commit
28f2c2f094
22
floodsub.go
22
floodsub.go
|
@ -33,6 +33,9 @@ type PubSub struct {
|
|||
// addSub is a control channel for us to add and remove subscriptions
|
||||
addSub chan *addSub
|
||||
|
||||
//
|
||||
getTopics chan *topicReq
|
||||
|
||||
// a notification channel for incoming streams from other peers
|
||||
newPeers chan inet.Stream
|
||||
|
||||
|
@ -75,6 +78,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
|
|||
newPeers: make(chan inet.Stream),
|
||||
peerDead: make(chan peer.ID),
|
||||
addSub: make(chan *addSub),
|
||||
getTopics: make(chan *topicReq),
|
||||
myTopics: make(map[string]chan *Message),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
peers: make(map[peer.ID]chan *RPC),
|
||||
|
@ -112,6 +116,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
for _, t := range p.topics {
|
||||
delete(t, pid)
|
||||
}
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
for t := range p.myTopics {
|
||||
out = append(out, t)
|
||||
}
|
||||
treq.resp <- out
|
||||
case sub := <-p.addSub:
|
||||
p.handleSubscriptionChange(sub)
|
||||
case rpc := <-p.incoming:
|
||||
|
@ -270,12 +280,22 @@ type addSub struct {
|
|||
resp chan chan *Message
|
||||
}
|
||||
|
||||
func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) {
|
||||
func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) {
|
||||
return p.SubscribeComplicated(&pb.TopicDescriptor{
|
||||
Name: proto.String(topic),
|
||||
})
|
||||
}
|
||||
|
||||
type topicReq struct {
|
||||
resp chan []string
|
||||
}
|
||||
|
||||
func (p *PubSub) GetTopics() []string {
|
||||
out := make(chan []string, 1)
|
||||
p.getTopics <- &topicReq{resp: out}
|
||||
return <-out
|
||||
}
|
||||
|
||||
func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
|
||||
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
||||
return nil, fmt.Errorf("Auth method not yet supported")
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -360,3 +361,58 @@ func TestTreeTopology(t *testing.T) {
|
|||
|
||||
checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs)
|
||||
}
|
||||
|
||||
func assertHasTopics(t *testing.T, ps *PubSub, exptopics ...string) {
|
||||
topics := ps.GetTopics()
|
||||
sort.Strings(topics)
|
||||
sort.Strings(exptopics)
|
||||
|
||||
if len(topics) != len(exptopics) {
|
||||
t.Fatalf("expected to have %v, but got %v", exptopics, topics)
|
||||
}
|
||||
|
||||
for i, v := range exptopics {
|
||||
if topics[i] != v {
|
||||
t.Fatalf("expected %s but have %s", v, topics[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubReporting(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
host := getNetHosts(t, ctx, 1)[0]
|
||||
psub := NewFloodSub(ctx, host)
|
||||
|
||||
_, err := psub.Subscribe(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = psub.Subscribe(ctx, "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertHasTopics(t, psub, "foo", "bar")
|
||||
|
||||
_, err = psub.Subscribe(ctx, "baz")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertHasTopics(t, psub, "foo", "bar", "baz")
|
||||
|
||||
psub.Unsub("bar")
|
||||
assertHasTopics(t, psub, "foo", "baz")
|
||||
psub.Unsub("foo")
|
||||
assertHasTopics(t, psub, "baz")
|
||||
|
||||
_, err = psub.Subscribe(ctx, "fish")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertHasTopics(t, psub, "baz", "fish")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue