From e72d15cc90b8b23ba46506a845de76b2cdc14348 Mon Sep 17 00:00:00 2001 From: Lukasz Zimnoch Date: Thu, 30 Apr 2020 10:39:53 +0200 Subject: [PATCH] Tests for topic relay --- topic_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/topic_test.go b/topic_test.go index 7beafa0..a4bf9e4 100644 --- a/topic_test.go +++ b/topic_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "sync" "testing" "time" @@ -71,6 +72,22 @@ func TestTopicCloseWithOpenEventHandler(t *testing.T) { ) } +func TestTopicCloseWithOpenRelay(t *testing.T) { + var relayCancel RelayCancelFunc + var err error + testTopicCloseWithOpenResource(t, + func(topic *Topic) { + relayCancel, err = topic.Relay() + if err != nil { + t.Fatal(err) + } + }, + func() { + relayCancel() + }, + ) +} + func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic), closeResource func()) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -510,6 +527,134 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) { notifSubThenUnSub(ctx, t, topics) } +func TestTopicRelay(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + const topic = "foobar" + const numHosts = 5 + + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + + // [0.Rel] - [1.Rel] - [2.Sub] + // | + // [3.Rel] - [4.Sub] + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[1], hosts[3]) + connect(t, hosts[3], hosts[4]) + + time.Sleep(time.Millisecond * 100) + + var subs []*Subscription + + for i, topic := range topics { + if i == 2 || i == 4 { + sub, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs = append(subs, sub) + } else { + _, err := topic.Relay() + if err != nil { + t.Fatal(err) + } + } + } + + time.Sleep(time.Millisecond * 100) + + for i := 0; i < 100; i++ { + msg := []byte("message") + + owner := rand.Intn(len(topics)) + + err := topics[owner].Publish(ctx, msg) + if err != nil { + t.Fatal(err) + } + + for _, sub := range subs { + received, err := sub.Next(ctx) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, received.Data) { + t.Fatal("received message is other than expected") + } + } + } +} + +func TestTopicRelayReuse(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + const numHosts = 1 + + hosts := getNetHosts(t, ctx, numHosts) + pubsubs := getPubsubs(ctx, hosts) + topics := getTopics(pubsubs, topic) + + relay1Cancel, err := topics[0].Relay() + if err != nil { + t.Fatal(err) + } + + relay2Cancel, err := topics[0].Relay() + if err != nil { + t.Fatal(err) + } + + relay3Cancel, err := topics[0].Relay() + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 100) + + if pubsubs[0].myRelays[topic] != 3 { + t.Fatal("incorrect number of relays") + } + + relay1Cancel() + relay2Cancel() + relay3Cancel() + + time.Sleep(time.Millisecond * 100) + + if pubsubs[0].myRelays[topic] != 0 { + t.Fatal("incorrect number of relays") + } +} + +func TestTopicRelayOnClosedTopic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + const numHosts = 1 + + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + + err := topics[0].Close() + if err != nil { + t.Fatal(err) + } + + _, err = topics[0].Relay() + if err == nil { + t.Fatalf("error should be returned") + } +} + func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { primaryTopic := topics[0] msgs := make([]*Subscription, len(topics))