package floodsub import ( "bytes" "context" "fmt" "math/rand" "testing" "time" host "github.com/libp2p/go-libp2p-host" ) func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { var psubs []*PubSub for _, h := range hs { ps, err := NewGossipSub(ctx, h, opts...) if err != nil { panic(err) } psubs = append(psubs, ps) } return psubs } func TestSparseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) psubs := getGossipsubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) } sparseConnect(t, hosts) // wait for heartbeats to build mesh time.Sleep(time.Second * 2) for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } } } func TestDenseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) psubs := getGossipsubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) } denseConnect(t, hosts) // wait for heartbeats to build mesh time.Sleep(time.Second * 2) for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } } } func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) psubs := getGossipsubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) } denseConnect(t, hosts) // wait for heartbeats to build mesh time.Sleep(time.Second * 2) for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } // wait a bit to have some gossip interleaved time.Sleep(time.Millisecond * 100) } // and wait for some gossip flushing time.Sleep(time.Second * 2) } func TestGossipsubPrune(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) psubs := getGossipsubs(ctx, hosts) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) } denseConnect(t, hosts) // wait for heartbeats to build mesh time.Sleep(time.Second * 2) // disconnect some peers from the mesh to get some PRUNEs for _, sub := range msgs[:5] { sub.Cancel() } // wait a bit to take effect time.Sleep(time.Millisecond * 100) for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs[5:] { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } } } func TestGossipsubGraft(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 20) psubs := getGossipsubs(ctx, hosts) sparseConnect(t, hosts) time.Sleep(time.Second * 1) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) // wait for announce to propagate time.Sleep(time.Millisecond * 100) } time.Sleep(time.Second * 1) for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } } } func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 30) gsubs := getGossipsubs(ctx, hosts[:20]) fsubs := getPubsubs(ctx, hosts[20:]) psubs := append(gsubs, fsubs...) var msgs []*Subscription for _, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs = append(msgs, subch) } sparseConnect(t, hosts) // wait for heartbeats to build mesh time.Sleep(time.Second * 2) for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) owner := rand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) for _, sub := range msgs { got, err := sub.Next(ctx) if err != nil { t.Fatal(sub.err) } if !bytes.Equal(msg, got.Data) { t.Fatal("got wrong message!") } } } } func TestGossipsubMultihops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 6) psubs := getGossipsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) connect(t, hosts[2], hosts[3]) connect(t, hosts[3], hosts[4]) connect(t, hosts[4], hosts[5]) var subs []*Subscription for i := 1; i < 6; i++ { ch, err := psubs[i].Subscribe("foobar") if err != nil { t.Fatal(err) } subs = append(subs, ch) } // wait for heartbeats to build mesh time.Sleep(time.Second * 2) msg := []byte("i like cats") err := psubs[0].Publish("foobar", msg) if err != nil { t.Fatal(err) } // last node in the chain should get the message select { case out := <-subs[4].ch: if !bytes.Equal(out.GetData(), msg) { t.Fatal("got wrong data") } case <-time.After(time.Second * 5): t.Fatal("timed out waiting for message") } } func TestGossipsubTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getNetHosts(t, ctx, 10) psubs := getGossipsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) connect(t, hosts[1], hosts[2]) connect(t, hosts[1], hosts[4]) connect(t, hosts[2], hosts[3]) connect(t, hosts[0], hosts[5]) connect(t, hosts[5], hosts[6]) connect(t, hosts[5], hosts[8]) connect(t, hosts[6], hosts[7]) connect(t, hosts[8], hosts[9]) /* [0] -> [1] -> [2] -> [3] | L->[4] v [5] -> [6] -> [7] | v [8] -> [9] */ var chs []*Subscription for _, ps := range psubs { ch, err := ps.Subscribe("fizzbuzz") if err != nil { t.Fatal(err) } chs = append(chs, ch) } // wait for heartbeats to build mesh time.Sleep(time.Second * 2) assertPeerLists(t, hosts, psubs[0], 1, 5) assertPeerLists(t, hosts, psubs[1], 0, 2, 4) assertPeerLists(t, hosts, psubs[2], 1, 3) checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) }