mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 13:53:06 +00:00
test control message retry piggybacking
This commit is contained in:
parent
f5d6cf3bd1
commit
a71eec5c3a
@ -641,6 +641,88 @@ func TestGossipsubGraftPruneRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGossipsubControlPiggyback(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts := getNetHosts(t, ctx, 20)
|
||||
psubs := getGossipsubs(ctx, hosts)
|
||||
denseConnect(t, hosts)
|
||||
|
||||
for _, ps := range psubs {
|
||||
subch, err := ps.Subscribe("flood")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func(sub *Subscription) {
|
||||
for {
|
||||
_, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}(subch)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
// create a background flood of messages that overloads the queues
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
owner := rand.Intn(len(psubs))
|
||||
for i := 0; i < 1000; i++ {
|
||||
msg := []byte("background flooooood")
|
||||
psubs[owner].Publish("flood", msg)
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
|
||||
// and subscribe to a bunch of topics in the meantime -- this should
|
||||
// result in some dropped control messages, with subsequent piggybacking
|
||||
// in the background flood
|
||||
var topics []string
|
||||
var msgs [][]*Subscription
|
||||
for i := 0; i < 5; i++ {
|
||||
topic := fmt.Sprintf("topic%d", i)
|
||||
topics = append(topics, topic)
|
||||
|
||||
var subs []*Subscription
|
||||
for _, ps := range psubs {
|
||||
subch, err := ps.Subscribe(topic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
subs = append(subs, subch)
|
||||
}
|
||||
msgs = append(msgs, subs)
|
||||
}
|
||||
|
||||
// wait for the flood to stop
|
||||
<-done
|
||||
|
||||
// and test that we have functional overlays
|
||||
for i, topic := range topics {
|
||||
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
|
||||
|
||||
owner := rand.Intn(len(psubs))
|
||||
|
||||
psubs[owner].Publish(topic, msg)
|
||||
|
||||
for _, sub := range msgs[i] {
|
||||
got, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(sub.err)
|
||||
}
|
||||
if !bytes.Equal(msg, got.Data) {
|
||||
t.Fatal("got wrong message!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMixedGossipsub(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user