go-waku/waku/v2/protocol/relay/waku_relay_test.go

432 lines
12 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
const defaultTestPubSubTopic = "/waku/2/go/relay/test"
const defaultTestContentTopic = "/test/10/my-app"
func TestWakuRelay(t *testing.T) {
testTopic := defaultTestPubSubTopic
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(testTopic), true)
topics := relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, testTopic, topics[0])
ctx, cancel := context.WithCancel(context.Background())
bytesToSend := []byte{1}
go func() {
defer cancel()
env := <-subs[0].Ch
t.Log("received msg", logging.Hash(env.Hash()))
}()
msg := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: "test",
}
_, err = relay.Publish(context.Background(), msg, WithPubSubTopic(testTopic))
require.NoError(t, err)
time.Sleep(2 * time.Second)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic))
require.NoError(t, err)
<-ctx.Done()
}
func createRelayNode(t *testing.T) (host.Host, *WakuRelay) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = bcaster.Start(context.Background())
require.NoError(t, err)
return host, relay
}
func TestGossipsubScore(t *testing.T) {
testTopic := defaultTestPubSubTopic
hosts := make([]host.Host, 5)
relay := make([]*WakuRelay, 5)
for i := 0; i < 5; i++ {
hosts[i], relay[i] = createRelayNode(t)
err := relay[i].Start(context.Background())
require.NoError(t, err)
}
for i := 0; i < 5; i++ {
for j := 0; j < 5; j++ {
if i == j {
continue
}
hosts[i].Peerstore().AddAddrs(hosts[j].ID(), hosts[j].Addrs(), peerstore.PermanentAddrTTL)
err := hosts[i].Connect(context.Background(), hosts[j].Peerstore().PeerInfo(hosts[j].ID()))
require.NoError(t, err)
}
topicData, err := relay[i].subscribeToPubsubTopic(testTopic)
require.NoError(t, err)
go func() {
for {
_, err := topicData.subscription.Next(context.Background())
if err != nil {
t.Log(err)
}
}
}()
}
time.Sleep(2 * time.Second)
for i := 0; i < 5; i++ {
require.Len(t, hosts[i].Network().Conns(), 4)
}
// We obtain the go-libp2p topic directly because we normally can't publish anything other than WakuMessages
pubsubTopic, err := relay[0].upsertTopic(testTopic)
require.NoError(t, err)
// Removing validator from relayer0 to allow it to send invalid messages
err = relay[0].pubsub.UnregisterTopicValidator(testTopic)
require.NoError(t, err)
for i := 0; i < 50; i++ {
buf := make([]byte, 1000)
_, err := rand.Read(buf)
require.NoError(t, err)
err = pubsubTopic.Publish(context.Background(), buf)
require.NoError(t, err)
}
// long wait, must be higher than the configured decayInterval (how often score is updated)
time.Sleep(20 * time.Second)
// nodes[0] was blacklisted from all other peers, no connections
require.Len(t, hosts[0].Network().Conns(), 0)
for i := 1; i < 5; i++ {
require.Len(t, hosts[i].Network().Conns(), 3)
}
}
func TestMsgID(t *testing.T) {
expectedMsgIDBytes := []byte{208, 214, 63, 55, 144, 6, 206, 39, 40, 251, 138, 74, 66, 168, 43, 32, 91, 94, 149, 122, 237, 198, 149, 87, 232, 156, 197, 34, 53, 131, 78, 112}
topic := "abcde"
msg := &pubsub_pb.Message{
Data: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
Topic: &topic,
}
msgID := msgIDFn(msg)
require.Equal(t, expectedMsgIDBytes, []byte(msgID))
}
func waitForTimeout(t *testing.T, ch chan *protocol.Envelope) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case env, ok := <-ch:
if ok {
t.Error("should not receive another message with payload", string(env.Message().Payload))
}
case <-time.After(2 * time.Second):
// Timeout elapsed, all good
}
}()
wg.Wait()
}
func waitForMsg(t *testing.T, ch chan *protocol.Envelope, cTopicExpected string) *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case env := <-ch:
require.Equal(t, cTopicExpected, env.Message().GetContentTopic())
case <-time.After(5 * time.Second):
t.Error("Message timeout")
}
}()
return &wg
}
func TestWakuRelayAutoShard(t *testing.T) {
testcTopic := "/toychat/2/huilong/proto"
testcTopic1 := "/toychat/1/huilong/proto"
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
defer bcaster.Stop()
//Create a contentTopic level subscription
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
sub, err := relay.GetSubscription(testcTopic)
require.NoError(t, err)
_, ok := sub.contentFilter.ContentTopics[testcTopic]
require.Equal(t, true, ok)
_, err = relay.GetSubscription(testcTopic1)
require.Error(t, err)
topics := relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0])
ctx, cancel := context.WithCancel(context.Background())
bytesToSend := []byte{1}
defer cancel()
//Create a pubSub level subscription
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
msg := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: testcTopic,
}
_, err = relay.Publish(context.Background(), msg)
require.NoError(t, err)
wg := waitForMsg(t, subs[0].Ch, testcTopic)
wg.Wait()
wg = waitForMsg(t, subs1[0].Ch, testcTopic)
wg.Wait()
//Test publishing to different content-topic
msg1 := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: testcTopic1,
}
_, err = relay.Publish(context.Background(), msg1, WithPubSubTopic(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
wg = waitForMsg(t, subs1[0].Ch, testcTopic1)
wg.Wait()
//Should not receive message as subscription is for a different cTopic.
waitForTimeout(t, subs[0].Ch)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
_, err = relay.GetSubscription(testcTopic)
require.Error(t, err)
_, err = relay.GetSubscription(testcTopic1)
require.Error(t, err)
topics = relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0])
wg2 := waitForMsg(t, subs1[0].Ch, testcTopic1)
msg2 := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: testcTopic1,
Timestamp: utils.GetUnixEpoch(),
}
_, err = relay.Publish(context.Background(), msg2, WithPubSubTopic(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
wg2.Wait()
err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
}
func TestInvalidMessagePublish(t *testing.T) {
testTopic := defaultTestPubSubTopic
testContentTopic := defaultTestContentTopic
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
// Test empty contentTopic
_, err = relay.Publish(ctx, tests.CreateWakuMessage("", utils.GetUnixEpoch(), "test_payload"), WithPubSubTopic(testTopic))
require.Error(t, err)
// Test empty payload
_, err = relay.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch(), ""), WithPubSubTopic(testTopic))
require.Error(t, err)
// Test empty contentTopic and payload
_, err = relay.Publish(ctx, tests.CreateWakuMessage("", utils.GetUnixEpoch(), ""), WithPubSubTopic(testTopic))
require.Error(t, err)
// Test Meta size over limit
message := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch(), "test_payload")
message.Meta = make([]byte, 65)
_, err = relay.Publish(ctx, message, WithPubSubTopic(testTopic))
require.Error(t, err)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
ctxCancel()
}
func TestWakuRelayStaticSharding(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Follow spec /waku/2/rs/<cluster_id>/<shard_number>
testTopic := "/waku/2/rs/64/0"
testContentTopic := "/test/10/my-relay"
// Host1
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host1, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster1 := NewBroadcaster(10)
relay1 := NewWakuRelay(bcaster1, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay1.SetHost(host1)
err = relay1.Start(context.Background())
require.NoError(t, err)
err = bcaster1.Start(context.Background())
require.NoError(t, err)
defer relay1.Stop()
// Host2
port, err = tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host2, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster2 := NewBroadcaster(10)
relay2 := NewWakuRelay(bcaster2, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay2.SetHost(host2)
err = relay2.Start(context.Background())
require.NoError(t, err)
err = bcaster2.Start(context.Background())
require.NoError(t, err)
defer relay2.Stop()
// Connect nodes
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), WakuRelayID_v200)
require.NoError(t, err)
// Wait for the mesh connection to happen between node1 and node2
time.Sleep(2 * time.Second)
// Subscribe to valid static shard topic on both hosts
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)
subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)
require.True(t, relay2.IsSubscribed(testTopic))
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])
msg := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch(), "test_payload")
// Test publish from host2 using autosharding -> should fail on topic format
_, err = relay2.Publish(ctx, msg)
require.Error(t, err)
// Test publish from host2 using static sharding -> should succeed
_, err = relay2.Publish(ctx, msg, WithPubSubTopic(testTopic))
require.NoError(t, err)
var wg sync.WaitGroup
// Msg should get received on host1
tests.WaitForMsg(t, 2*time.Second, &wg, subs1[0].Ch)
}