make Subscribe go through SubscribeByTopicDescriptor

This commit is contained in:
Jan Winkelmann 2016-11-12 00:47:53 +01:00
parent 1c9a576526
commit ae48a15d7b
2 changed files with 101 additions and 33 deletions

View File

@ -356,14 +356,10 @@ type addSubReq struct {
}
// Subscribe returns a new Subscription for the given topic
func (p *PubSub) Subscribe(topic string) *Subscription {
out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{
topic: topic,
resp: out,
}
func (p *PubSub) Subscribe(topic string) (*Subscription, error) {
td := pb.TopicDescriptor{Name: &topic}
return <-out
return p.SubscribeByTopicDescriptor(&td)
}
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
@ -376,7 +372,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscripti
return nil, fmt.Errorf("encryption mode not yet supported")
}
return p.Subscribe(td.GetName()), nil
out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{
topic: td.GetName(),
resp: out,
}
return <-out, nil
}
type topicReq struct {

View File

@ -106,7 +106,10 @@ func TestBasicFloodsub(t *testing.T) {
var msgs []*Subscription
for _, ps := range psubs {
subch := ps.Subscribe("foobar")
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, subch)
}
@ -152,7 +155,10 @@ func TestMultihops(t *testing.T) {
var subs []*Subscription
for i := 1; i < 6; i++ {
ch := psubs[i].Subscribe("foobar")
ch, err := psubs[i].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
subs = append(subs, ch)
}
@ -186,14 +192,20 @@ func TestReconnects(t *testing.T) {
connect(t, hosts[0], hosts[1])
connect(t, hosts[0], hosts[2])
A := psubs[1].Subscribe("cats")
A, err := psubs[1].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
B := psubs[2].Subscribe("cats")
B, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
msg := []byte("apples and oranges")
err := psubs[0].Publish("cats", msg)
err = psubs[0].Publish("cats", msg)
if err != nil {
t.Fatal(err)
}
@ -226,7 +238,10 @@ func TestReconnects(t *testing.T) {
t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs)
}
ch2 := psubs[2].Subscribe("cats")
ch2, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
@ -248,9 +263,12 @@ func TestNoConnection(t *testing.T) {
psubs := getPubsubs(ctx, hosts)
ch := psubs[5].Subscribe("foobar")
ch, err := psubs[5].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
err := psubs[0].Publish("foobar", []byte("TESTING"))
err = psubs[0].Publish("foobar", []byte("TESTING"))
if err != nil {
t.Fatal(err)
}
@ -279,7 +297,10 @@ func TestSelfReceive(t *testing.T) {
time.Sleep(time.Millisecond * 10)
ch := psub.Subscribe("foobar")
ch, err := psub.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msg2 := []byte("goodbye world")
err = psub.Publish("foobar", msg2)
@ -299,7 +320,10 @@ func TestOneToOne(t *testing.T) {
connect(t, hosts[0], hosts[1])
ch := psubs[1].Subscribe("foobar")
ch, err := psubs[1].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
@ -349,7 +373,10 @@ func TestTreeTopology(t *testing.T) {
var chs []*Subscription
for _, ps := range psubs {
ch := ps.Subscribe("fizzbuzz")
ch, err := ps.Subscribe("fizzbuzz")
if err != nil {
t.Fatal(err)
}
chs = append(chs, ch)
}
@ -386,13 +413,22 @@ func TestSubReporting(t *testing.T) {
host := getNetHosts(t, ctx, 1)[0]
psub := NewFloodSub(ctx, host)
fooSub := psub.Subscribe("foo")
fooSub, err := psub.Subscribe("foo")
if err != nil {
t.Fatal(err)
}
barSub := psub.Subscribe("bar")
barSub, err := psub.Subscribe("bar")
if err != nil {
t.Fatal(err)
}
assertHasTopics(t, psub, "foo", "bar")
_ = psub.Subscribe("baz")
_, err = psub.Subscribe("baz")
if err != nil {
t.Fatal(err)
}
assertHasTopics(t, psub, "foo", "bar", "baz")
@ -401,7 +437,10 @@ func TestSubReporting(t *testing.T) {
fooSub.Cancel()
assertHasTopics(t, psub, "baz")
_ = psub.Subscribe("fish")
_, err = psub.Subscribe("fish")
if err != nil {
t.Fatal(err)
}
assertHasTopics(t, psub, "baz", "fish")
}
@ -417,17 +456,38 @@ func TestPeerTopicReporting(t *testing.T) {
connect(t, hosts[0], hosts[2])
connect(t, hosts[0], hosts[3])
psubs[1].Subscribe("foo")
psubs[1].Subscribe("bar")
psubs[1].Subscribe("baz")
_, err := psubs[1].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
_, err = psubs[1].Subscribe("bar")
if err != nil {
t.Fatal(err)
}
_, err = psubs[1].Subscribe("baz")
if err != nil {
t.Fatal(err)
}
psubs[2].Subscribe("foo")
psubs[2].Subscribe("ipfs")
_, err = psubs[2].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
_, err = psubs[2].Subscribe("ipfs")
if err != nil {
t.Fatal(err)
}
psubs[3].Subscribe("baz")
psubs[3].Subscribe("ipfs")
_, err = psubs[3].Subscribe("baz")
if err != nil {
t.Fatal(err)
}
_, err = psubs[3].Subscribe("ipfs")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 1)
peers := psubs[0].ListPeers("ipfs")
assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID())
@ -450,8 +510,14 @@ func TestSubscribeMultipleTimes(t *testing.T) {
connect(t, hosts[0], hosts[1])
sub1 := psubs[0].Subscribe("foo")
sub2 := psubs[0].Subscribe("foo")
sub1, err := psubs[0].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
sub2, err := psubs[0].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
// make sure subscribing is finished by the time we publish
time.Sleep(1 * time.Millisecond)