diff --git a/floodsub_test.go b/floodsub_test.go index 5bfac6f..86f1b6a 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -12,6 +12,22 @@ import ( netutil "github.com/libp2p/go-libp2p/p2p/test/util" ) +func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []<-chan *Message) { + data := make([]byte, 16) + rand.Read(data) + + for _, p := range pubs { + err := p.Publish(topic, data) + if err != nil { + t.Fatal(err) + } + + for _, s := range subs { + assertReceive(t, s, data) + } + } +} + func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { var out []host.Host @@ -87,7 +103,7 @@ func TestBasicFloodsub(t *testing.T) { var msgs []<-chan *Message for _, ps := range psubs { - subch, err := ps.Subscribe("foobar") + subch, err := ps.Subscribe(ctx, "foobar") if err != nil { t.Fatal(err) } @@ -133,7 +149,7 @@ func TestMultihops(t *testing.T) { var msgChs []<-chan *Message for i := 1; i < 6; i++ { - ch, err := psubs[i].Subscribe("foobar") + ch, err := psubs[i].Subscribe(ctx, "foobar") if err != nil { t.Fatal(err) } @@ -170,12 +186,12 @@ func TestReconnects(t *testing.T) { connect(t, hosts[0], hosts[1]) connect(t, hosts[0], hosts[2]) - A, err := psubs[1].Subscribe("cats") + A, err := psubs[1].Subscribe(ctx, "cats") if err != nil { t.Fatal(err) } - B, err := psubs[2].Subscribe("cats") + B, err := psubs[2].Subscribe(ctx, "cats") if err != nil { t.Fatal(err) } @@ -211,7 +227,7 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - ch2, err := psubs[2].Subscribe("cats") + ch2, err := psubs[2].Subscribe(ctx, "cats") if err != nil { t.Fatal(err) } @@ -236,7 +252,7 @@ func TestNoConnection(t *testing.T) { psubs := getPubsubs(ctx, hosts) - ch, err := psubs[5].Subscribe("foobar") + ch, err := psubs[5].Subscribe(ctx, "foobar") if err != nil { t.Fatal(err) } @@ -270,7 +286,7 @@ func TestSelfReceive(t *testing.T) { time.Sleep(time.Millisecond * 10) - ch, err := psub.Subscribe("foobar") + ch, err := psub.Subscribe(ctx, "foobar") if err != nil { t.Fatal(err) } @@ -283,3 +299,64 @@ func TestSelfReceive(t *testing.T) { assertReceive(t, ch, msg2) } + +func TestOneToOne(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + + ch, err := psubs[1].Subscribe(ctx, "foobar") + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 50) + + checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch}) +} + +func TestTreeTopology(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 10) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[1], hosts[4]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[0], hosts[5]) + connect(t, hosts[5], hosts[6]) + connect(t, hosts[5], hosts[8]) + connect(t, hosts[6], hosts[7]) + connect(t, hosts[8], hosts[9]) + + /* + [0] -> [1] -> [2] -> [3] + | L->[4] + v + [5] -> [6] -> [7] + | + v + [8] -> [9] + */ + + var chs []<-chan *Message + for _, ps := range psubs { + ch, err := ps.Subscribe(ctx, "fizzbuzz") + if err != nil { + t.Fatal(err) + } + + chs = append(chs, ch) + } + + time.Sleep(time.Millisecond * 50) + + checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) +}