Tests for topic relay

This commit is contained in:
Lukasz Zimnoch 2020-04-30 10:39:53 +02:00 committed by vyzo
parent 3336559a27
commit e72d15cc90
1 changed files with 145 additions and 0 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
@ -71,6 +72,22 @@ func TestTopicCloseWithOpenEventHandler(t *testing.T) {
)
}
func TestTopicCloseWithOpenRelay(t *testing.T) {
var relayCancel RelayCancelFunc
var err error
testTopicCloseWithOpenResource(t,
func(topic *Topic) {
relayCancel, err = topic.Relay()
if err != nil {
t.Fatal(err)
}
},
func() {
relayCancel()
},
)
}
func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic), closeResource func()) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -510,6 +527,134 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) {
notifSubThenUnSub(ctx, t, topics)
}
func TestTopicRelay(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
const topic = "foobar"
const numHosts = 5
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
// [0.Rel] - [1.Rel] - [2.Sub]
// |
// [3.Rel] - [4.Sub]
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
connect(t, hosts[1], hosts[3])
connect(t, hosts[3], hosts[4])
time.Sleep(time.Millisecond * 100)
var subs []*Subscription
for i, topic := range topics {
if i == 2 || i == 4 {
sub, err := topic.Subscribe()
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
} else {
_, err := topic.Relay()
if err != nil {
t.Fatal(err)
}
}
}
time.Sleep(time.Millisecond * 100)
for i := 0; i < 100; i++ {
msg := []byte("message")
owner := rand.Intn(len(topics))
err := topics[owner].Publish(ctx, msg)
if err != nil {
t.Fatal(err)
}
for _, sub := range subs {
received, err := sub.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(msg, received.Data) {
t.Fatal("received message is other than expected")
}
}
}
}
func TestTopicRelayReuse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
const numHosts = 1
hosts := getNetHosts(t, ctx, numHosts)
pubsubs := getPubsubs(ctx, hosts)
topics := getTopics(pubsubs, topic)
relay1Cancel, err := topics[0].Relay()
if err != nil {
t.Fatal(err)
}
relay2Cancel, err := topics[0].Relay()
if err != nil {
t.Fatal(err)
}
relay3Cancel, err := topics[0].Relay()
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
if pubsubs[0].myRelays[topic] != 3 {
t.Fatal("incorrect number of relays")
}
relay1Cancel()
relay2Cancel()
relay3Cancel()
time.Sleep(time.Millisecond * 100)
if pubsubs[0].myRelays[topic] != 0 {
t.Fatal("incorrect number of relays")
}
}
func TestTopicRelayOnClosedTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
const numHosts = 1
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
err := topics[0].Close()
if err != nil {
t.Fatal(err)
}
_, err = topics[0].Relay()
if err == nil {
t.Fatalf("error should be returned")
}
}
func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) {
primaryTopic := topics[0]
msgs := make([]*Subscription, len(topics))