Test, SubscribeByTopicDescriptor and minor improvement
This commit is contained in:
parent
c9b2c6c8fd
commit
1c9a576526
20
floodsub.go
20
floodsub.go
|
@ -132,10 +132,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
}
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
for t, subs := range p.myTopics {
|
||||
if len(subs) > 0 {
|
||||
out = append(out, t)
|
||||
}
|
||||
for t := range p.myTopics {
|
||||
out = append(out, t)
|
||||
}
|
||||
treq.resp <- out
|
||||
case sub := <-p.cancelCh:
|
||||
|
@ -190,6 +188,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
|||
delete(subs, sub)
|
||||
|
||||
if len(subs) == 0 {
|
||||
delete(p.myTopics, sub.topic)
|
||||
p.announce(sub.topic, false)
|
||||
}
|
||||
}
|
||||
|
@ -367,6 +366,19 @@ func (p *PubSub) Subscribe(topic string) *Subscription {
|
|||
return <-out
|
||||
}
|
||||
|
||||
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
|
||||
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) {
|
||||
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
||||
return nil, fmt.Errorf("auth mode not yet supported")
|
||||
}
|
||||
|
||||
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
|
||||
return nil, fmt.Errorf("encryption mode not yet supported")
|
||||
}
|
||||
|
||||
return p.Subscribe(td.GetName()), nil
|
||||
}
|
||||
|
||||
type topicReq struct {
|
||||
resp chan []string
|
||||
}
|
||||
|
|
|
@ -441,6 +441,45 @@ func TestPeerTopicReporting(t *testing.T) {
|
|||
assertPeerList(t, peers, hosts[1].ID())
|
||||
}
|
||||
|
||||
func TestSubscribeMultipleTimes(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts := getNetHosts(t, ctx, 2)
|
||||
psubs := getPubsubs(ctx, hosts)
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
|
||||
sub1 := psubs[0].Subscribe("foo")
|
||||
sub2 := psubs[0].Subscribe("foo")
|
||||
|
||||
// make sure subscribing is finished by the time we publish
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
psubs[1].Publish("foo", []byte("bar"))
|
||||
|
||||
msg, err := sub1.Next()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v.", err)
|
||||
}
|
||||
|
||||
data := string(msg.GetData())
|
||||
|
||||
if data != "bar" {
|
||||
t.Fatalf("data is %s, expected %s.", data, "bar")
|
||||
}
|
||||
|
||||
msg, err = sub2.Next()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v.", err)
|
||||
}
|
||||
data = string(msg.GetData())
|
||||
|
||||
if data != "bar" {
|
||||
t.Fatalf("data is %s, expected %s.", data, "bar")
|
||||
}
|
||||
}
|
||||
|
||||
func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) {
|
||||
sort.Sort(peer.IDSlice(peers))
|
||||
sort.Sort(peer.IDSlice(expected))
|
||||
|
|
Loading…
Reference in New Issue