diff --git a/randomsub_test.go b/randomsub_test.go index 9873138..4e44f17 100644 --- a/randomsub_test.go +++ b/randomsub_test.go @@ -25,6 +25,17 @@ func getRandomsubs(ctx context.Context, hs []host.Host, size int, opts ...Option return psubs } +func tryReceive(sub *Subscription) *Message { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + m, err := sub.Next(ctx) + if err != nil { + return nil + } else { + return m + } +} + func TestRandomsubSmall(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -45,14 +56,21 @@ func TestRandomsubSmall(t *testing.T) { time.Sleep(time.Second) + count := 0 for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("message %d", i)) psubs[i].Publish("test", msg) for _, sub := range subs { - assertReceive(t, sub, msg) + if tryReceive(sub) != nil { + count++ + } } } + + if count < 9*len(hosts) { + t.Fatalf("received too few messages; expected at least %d but got %d", 9*len(hosts), count) + } } func TestRandomsubBig(t *testing.T) { @@ -75,14 +93,21 @@ func TestRandomsubBig(t *testing.T) { time.Sleep(time.Second) + count := 0 for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("message %d", i)) psubs[i].Publish("test", msg) for _, sub := range subs { - assertReceive(t, sub, msg) + if tryReceive(sub) != nil { + count++ + } } } + + if count < 9*len(hosts) { + t.Fatalf("received too few messages; expected at least %d but got %d", 9*len(hosts), count) + } } func TestRandomsubMixed(t *testing.T) { @@ -107,14 +132,21 @@ func TestRandomsubMixed(t *testing.T) { time.Sleep(time.Second) + count := 0 for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("message %d", i)) psubs[i].Publish("test", msg) for _, sub := range subs { - assertReceive(t, sub, msg) + if tryReceive(sub) != nil { + count++ + } } } + + if count < 9*len(hosts) { + t.Fatalf("received too few messages; expected at least %d but got %d", 9*len(hosts), count) + } } func TestRandomsubEnoughPeers(t *testing.T) {