From 1c9a5765266d3903b6a0da4f2a93ff3e6362da1b Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Fri, 11 Nov 2016 16:22:47 +0100 Subject: [PATCH] Test, SubscribeByTopicDescriptor and minor improvement --- floodsub.go | 20 ++++++++++++++++---- floodsub_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/floodsub.go b/floodsub.go index 40f530b..b70c5c7 100644 --- a/floodsub.go +++ b/floodsub.go @@ -132,10 +132,8 @@ func (p *PubSub) processLoop(ctx context.Context) { } case treq := <-p.getTopics: var out []string - for t, subs := range p.myTopics { - if len(subs) > 0 { - out = append(out, t) - } + for t := range p.myTopics { + out = append(out, t) } treq.resp <- out case sub := <-p.cancelCh: @@ -190,6 +188,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { delete(subs, sub) if len(subs) == 0 { + delete(p.myTopics, sub.topic) p.announce(sub.topic, false) } } @@ -367,6 +366,19 @@ func (p *PubSub) Subscribe(topic string) *Subscription { return <-out } +// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor +func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) { + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("auth mode not yet supported") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("encryption mode not yet supported") + } + + return p.Subscribe(td.GetName()), nil +} + type topicReq struct { resp chan []string } diff --git a/floodsub_test.go b/floodsub_test.go index d386c7a..371cca5 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -441,6 +441,45 @@ func TestPeerTopicReporting(t *testing.T) { assertPeerList(t, peers, hosts[1].ID()) } +func TestSubscribeMultipleTimes(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + + sub1 := psubs[0].Subscribe("foo") + sub2 := psubs[0].Subscribe("foo") + + // make sure subscribing is finished by the time we publish + time.Sleep(1 * time.Millisecond) + + psubs[1].Publish("foo", []byte("bar")) + + msg, err := sub1.Next() + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + + data := string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } + + msg, err = sub2.Next() + if err != nil { + t.Fatalf("unexpected error: %v.", err) + } + data = string(msg.GetData()) + + if data != "bar" { + t.Fatalf("data is %s, expected %s.", data, "bar") + } +} + func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) { sort.Sort(peer.IDSlice(peers)) sort.Sort(peer.IDSlice(expected))