From b71e3adbd7cc7d24a9193b57ca21c85b090ecfc4 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 20 Oct 2016 01:10:29 +0200 Subject: [PATCH] first cleanup wave --- floodsub.go | 21 ++---------- floodsub_test.go | 83 +++++++++++++----------------------------------- 2 files changed, 24 insertions(+), 80 deletions(-) diff --git a/floodsub.go b/floodsub.go index cdedce3..ac6438b 100644 --- a/floodsub.go +++ b/floodsub.go @@ -337,31 +337,14 @@ type addSubReq struct { resp chan *Subscription } -func (p *PubSub) Subscribe(topic string) (*Subscription, error) { - td := &pb.TopicDescriptor{ - Name: proto.String(topic), - } - - if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { - return nil, fmt.Errorf("Auth method not yet supported") - } - - if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { - return nil, fmt.Errorf("Encryption method not yet supported") - } - +func (p *PubSub) Subscribe(topic string) *Subscription { out := make(chan *Subscription, 1) p.addSub <- &addSubReq{ topic: topic, resp: out, } - resp := <-out - if resp == nil { - return nil, fmt.Errorf("not subscribed to topic %s", topic) - } - - return resp, nil + return <-out } type topicReq struct { diff --git a/floodsub_test.go b/floodsub_test.go index bac78bc..93fa6b7 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -106,10 +106,7 @@ func TestBasicFloodsub(t *testing.T) { var msgs []*Subscription for _, ps := range psubs { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + subch := ps.Subscribe("foobar") msgs = append(msgs, subch) } @@ -155,10 +152,7 @@ func TestMultihops(t *testing.T) { var subs []*Subscription for i := 1; i < 6; i++ { - ch, err := psubs[i].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[i].Subscribe("foobar") subs = append(subs, ch) } @@ -192,20 +186,14 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A, err := psubs[1].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + A := psubs[1].Subscribe("cats") - B, err := psubs[2].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + B := psubs[2].Subscribe("cats") time.Sleep(time.Millisecond * 100) msg := []byte("apples and oranges") - err = psubs[0].Publish("cats", msg) + err := psubs[0].Publish("cats", msg) if err != nil { t.Fatal(err) } @@ -238,10 +226,7 @@ func TestReconnects(t *testing.T) { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } - ch2, err := psubs[2].Subscribe("cats") - if err != nil { - t.Fatal(err) - } + ch2 := psubs[2].Subscribe("cats") time.Sleep(time.Millisecond * 100) @@ -263,12 +248,9 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch, err := psubs[5].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[5].Subscribe("foobar") - err = psubs[0].Publish("foobar", []byte("TESTING")) + err := psubs[0].Publish("foobar", []byte("TESTING")) if err != nil { t.Fatal(err) } @@ -297,10 +279,7 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch, err := psub.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psub.Subscribe("foobar") msg2 := []byte("goodbye world") err = psub.Publish("foobar", msg2) @@ -320,10 +299,7 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch, err := psubs[1].Subscribe("foobar") - if err != nil { - t.Fatal(err) - } + ch := psubs[1].Subscribe("foobar") time.Sleep(time.Millisecond * 50) @@ -373,10 +349,7 @@ func TestTreeTopology(t *testing.T) { var chs []*Subscription for _, ps := range psubs { - ch, err := ps.Subscribe("fizzbuzz") - if err != nil { - t.Fatal(err) - } + ch := ps.Subscribe("fizzbuzz") chs = append(chs, ch) } @@ -413,22 +386,13 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - fooSub, err := psub.Subscribe("foo") - if err != nil { - t.Fatal(err) - } + fooSub := psub.Subscribe("foo") - barSub, err := psub.Subscribe("bar") - if err != nil { - t.Fatal(err) - } + barSub := psub.Subscribe("bar") assertHasTopics(t, psub, "foo", "bar") - _, err = psub.Subscribe("baz") - if err != nil { - t.Fatal(err) - } + _ = psub.Subscribe("baz") assertHasTopics(t, psub, "foo", "bar", "baz") @@ -437,10 +401,7 @@ func TestSubReporting(t *testing.T) { fooSub.Cancel() assertHasTopics(t, psub, "baz") - _, err = psub.Subscribe("fish") - if err != nil { - t.Fatal(err) - } + _ = psub.Subscribe("fish") assertHasTopics(t, psub, "baz", "fish") } @@ -456,15 +417,15 @@ func TestPeerTopicReporting(t *testing.T) { connect(t, hosts[0], hosts[2]) connect(t, hosts[0], hosts[3]) - psubs[1].Subscribe(ctx, "foo") - psubs[1].Subscribe(ctx, "bar") - psubs[1].Subscribe(ctx, "baz") + psubs[1].Subscribe("foo") + psubs[1].Subscribe("bar") + psubs[1].Subscribe("baz") - psubs[2].Subscribe(ctx, "foo") - psubs[2].Subscribe(ctx, "ipfs") + psubs[2].Subscribe("foo") + psubs[2].Subscribe("ipfs") - psubs[3].Subscribe(ctx, "baz") - psubs[3].Subscribe(ctx, "ipfs") + psubs[3].Subscribe("baz") + psubs[3].Subscribe("ipfs") time.Sleep(time.Millisecond * 10) peers := psubs[0].ListPeers("ipfs")