first cleanup wave

This commit is contained in:
Jan Winkelmann 2016-10-20 01:10:29 +02:00
parent 822640a482
commit b71e3adbd7
2 changed files with 24 additions and 80 deletions

View File

@ -337,31 +337,14 @@ type addSubReq struct {
resp chan *Subscription
}
func (p *PubSub) Subscribe(topic string) (*Subscription, error) {
td := &pb.TopicDescriptor{
Name: proto.String(topic),
}
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("Auth method not yet supported")
}
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
return nil, fmt.Errorf("Encryption method not yet supported")
}
func (p *PubSub) Subscribe(topic string) *Subscription {
out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{
topic: topic,
resp: out,
}
resp := <-out
if resp == nil {
return nil, fmt.Errorf("not subscribed to topic %s", topic)
}
return resp, nil
return <-out
}
type topicReq struct {

View File

@ -106,10 +106,7 @@ func TestBasicFloodsub(t *testing.T) {
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
subch := ps.Subscribe("foobar")
msgs = append(msgs, subch)
}
@ -155,10 +152,7 @@ func TestMultihops(t *testing.T) {
var subs []*Subscription
for i := 1; i < 6; i++ {
ch, err := psubs[i].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
ch := psubs[i].Subscribe("foobar")
subs = append(subs, ch)
}
@ -192,20 +186,14 @@ func TestReconnects(t *testing.T) {
connect(t, hosts[0], hosts[1])
connect(t, hosts[0], hosts[2])
A, err := psubs[1].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
A := psubs[1].Subscribe("cats")
B, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
B := psubs[2].Subscribe("cats")
time.Sleep(time.Millisecond * 100)
msg := []byte("apples and oranges")
err = psubs[0].Publish("cats", msg)
err := psubs[0].Publish("cats", msg)
if err != nil {
t.Fatal(err)
}
@ -238,10 +226,7 @@ func TestReconnects(t *testing.T) {
t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs)
}
ch2, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
ch2 := psubs[2].Subscribe("cats")
time.Sleep(time.Millisecond * 100)
@ -263,12 +248,9 @@ func TestNoConnection(t *testing.T) {
psubs := getPubsubs(ctx, hosts)
ch, err := psubs[5].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
ch := psubs[5].Subscribe("foobar")
err = psubs[0].Publish("foobar", []byte("TESTING"))
err := psubs[0].Publish("foobar", []byte("TESTING"))
if err != nil {
t.Fatal(err)
}
@ -297,10 +279,7 @@ func TestSelfReceive(t *testing.T) {
time.Sleep(time.Millisecond * 10)
ch, err := psub.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
ch := psub.Subscribe("foobar")
msg2 := []byte("goodbye world")
err = psub.Publish("foobar", msg2)
@ -320,10 +299,7 @@ func TestOneToOne(t *testing.T) {
connect(t, hosts[0], hosts[1])
ch, err := psubs[1].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
ch := psubs[1].Subscribe("foobar")
time.Sleep(time.Millisecond * 50)
@ -373,10 +349,7 @@ func TestTreeTopology(t *testing.T) {
var chs []*Subscription
for _, ps := range psubs {
ch, err := ps.Subscribe("fizzbuzz")
if err != nil {
t.Fatal(err)
}
ch := ps.Subscribe("fizzbuzz")
chs = append(chs, ch)
}
@ -413,22 +386,13 @@ func TestSubReporting(t *testing.T) {
host := getNetHosts(t, ctx, 1)[0]
psub := NewFloodSub(ctx, host)
fooSub, err := psub.Subscribe("foo")
if err != nil {
t.Fatal(err)
}
fooSub := psub.Subscribe("foo")
barSub, err := psub.Subscribe("bar")
if err != nil {
t.Fatal(err)
}
barSub := psub.Subscribe("bar")
assertHasTopics(t, psub, "foo", "bar")
_, err = psub.Subscribe("baz")
if err != nil {
t.Fatal(err)
}
_ = psub.Subscribe("baz")
assertHasTopics(t, psub, "foo", "bar", "baz")
@ -437,10 +401,7 @@ func TestSubReporting(t *testing.T) {
fooSub.Cancel()
assertHasTopics(t, psub, "baz")
_, err = psub.Subscribe("fish")
if err != nil {
t.Fatal(err)
}
_ = psub.Subscribe("fish")
assertHasTopics(t, psub, "baz", "fish")
}
@ -456,15 +417,15 @@ func TestPeerTopicReporting(t *testing.T) {
connect(t, hosts[0], hosts[2])
connect(t, hosts[0], hosts[3])
psubs[1].Subscribe(ctx, "foo")
psubs[1].Subscribe(ctx, "bar")
psubs[1].Subscribe(ctx, "baz")
psubs[1].Subscribe("foo")
psubs[1].Subscribe("bar")
psubs[1].Subscribe("baz")
psubs[2].Subscribe(ctx, "foo")
psubs[2].Subscribe(ctx, "ipfs")
psubs[2].Subscribe("foo")
psubs[2].Subscribe("ipfs")
psubs[3].Subscribe(ctx, "baz")
psubs[3].Subscribe(ctx, "ipfs")
psubs[3].Subscribe("baz")
psubs[3].Subscribe("ipfs")
time.Sleep(time.Millisecond * 10)
peers := psubs[0].ListPeers("ipfs")