diff --git a/floodsub.go b/floodsub.go index b70c5c7..0d7dec5 100644 --- a/floodsub.go +++ b/floodsub.go @@ -356,14 +356,10 @@ type addSubReq struct { } // Subscribe returns a new Subscription for the given topic -func (p *PubSub) Subscribe(topic string) *Subscription { - out := make(chan *Subscription, 1) - p.addSub <- &addSubReq{ - topic: topic, - resp: out, - } +func (p *PubSub) Subscribe(topic string) (*Subscription, error) { + td := pb.TopicDescriptor{Name: &topic} - return <-out + return p.SubscribeByTopicDescriptor(&td) } // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor @@ -376,7 +372,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscripti return nil, fmt.Errorf("encryption mode not yet supported") } - return p.Subscribe(td.GetName()), nil + out := make(chan *Subscription, 1) + p.addSub <- &addSubReq{ + topic: td.GetName(), + resp: out, + } + + return <-out, nil } type topicReq struct { diff --git a/floodsub_test.go b/floodsub_test.go index 371cca5..83c114c 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -106,7 +106,10 @@ func TestBasicFloodsub(t *testing.T) { var msgs []*Subscription for _, ps := range psubs { - subch := ps.Subscribe("foobar") + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } msgs = append(msgs, subch) } @@ -152,7 +155,10 @@ func TestMultihops(t *testing.T) { var subs []*Subscription for i := 1; i < 6; i++ { - ch := psubs[i].Subscribe("foobar") + ch, err := psubs[i].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } subs = append(subs, ch) } @@ -186,14 +192,20 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A := psubs[1].Subscribe("cats") + A, err := psubs[1].Subscribe("cats") + if err != nil { + t.Fatal(err) + } - B := psubs[2].Subscribe("cats") + B, err := psubs[2].Subscribe("cats") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 100) msg := []byte("apples and oranges") - err := psubs[0].Publish("cats", msg) + err = psubs[0].Publish("cats", msg) if err != nil { t.Fatal(err) } @@ -226,7 +238,10 @@ func TestReconnects(t *testing.T) { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } - ch2 := psubs[2].Subscribe("cats") + ch2, err := psubs[2].Subscribe("cats") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 100) @@ -248,9 +263,12 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch := psubs[5].Subscribe("foobar") + ch, err := psubs[5].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } - err := psubs[0].Publish("foobar", []byte("TESTING")) + err = psubs[0].Publish("foobar", []byte("TESTING")) if err != nil { t.Fatal(err) } @@ -279,7 +297,10 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch := psub.Subscribe("foobar") + ch, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } msg2 := []byte("goodbye world") err = psub.Publish("foobar", msg2) @@ -299,7 +320,10 @@ func TestOneToOne(t *testing.T) { connect(t, hosts[0], hosts[1]) - ch := psubs[1].Subscribe("foobar") + ch, err := psubs[1].Subscribe("foobar") + if err != nil { + t.Fatal(err) + } time.Sleep(time.Millisecond * 50) @@ -349,7 +373,10 @@ func TestTreeTopology(t *testing.T) { var chs []*Subscription for _, ps := range psubs { - ch := ps.Subscribe("fizzbuzz") + ch, err := ps.Subscribe("fizzbuzz") + if err != nil { + t.Fatal(err) + } chs = append(chs, ch) } @@ -386,13 +413,22 @@ func TestSubReporting(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] psub := NewFloodSub(ctx, host) - fooSub := psub.Subscribe("foo") + fooSub, err := psub.Subscribe("foo") + if err != nil { + t.Fatal(err) + } - barSub := psub.Subscribe("bar") + barSub, err := psub.Subscribe("bar") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "foo", "bar") - _ = psub.Subscribe("baz") + _, err = psub.Subscribe("baz") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "foo", "bar", "baz") @@ -401,7 +437,10 @@ func TestSubReporting(t *testing.T) { fooSub.Cancel() assertHasTopics(t, psub, "baz") - _ = psub.Subscribe("fish") + _, err = psub.Subscribe("fish") + if err != nil { + t.Fatal(err) + } assertHasTopics(t, psub, "baz", "fish") } @@ -417,17 +456,38 @@ func TestPeerTopicReporting(t *testing.T) { connect(t, hosts[0], hosts[2]) connect(t, hosts[0], hosts[3]) - psubs[1].Subscribe("foo") - psubs[1].Subscribe("bar") - psubs[1].Subscribe("baz") + _, err := psubs[1].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("bar") + if err != nil { + t.Fatal(err) + } + _, err = psubs[1].Subscribe("baz") + if err != nil { + t.Fatal(err) + } - psubs[2].Subscribe("foo") - psubs[2].Subscribe("ipfs") + _, err = psubs[2].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + _, err = psubs[2].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } - psubs[3].Subscribe("baz") - psubs[3].Subscribe("ipfs") + _, err = psubs[3].Subscribe("baz") + if err != nil { + t.Fatal(err) + } + _, err = psubs[3].Subscribe("ipfs") + if err != nil { + t.Fatal(err) + } - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 1) peers := psubs[0].ListPeers("ipfs") assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) @@ -450,8 +510,14 @@ func TestSubscribeMultipleTimes(t *testing.T) { connect(t, hosts[0], hosts[1]) - sub1 := psubs[0].Subscribe("foo") - sub2 := psubs[0].Subscribe("foo") + sub1, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } + sub2, err := psubs[0].Subscribe("foo") + if err != nil { + t.Fatal(err) + } // make sure subscribing is finished by the time we publish time.Sleep(1 * time.Millisecond)