go-waku/waku/v2/node/wakunode2_test.go

323 lines
8.3 KiB
Go
Raw Normal View History

2021-11-12 10:19:42 +01:00
package node
import (
2022-03-18 15:50:10 -04:00
"bytes"
2021-11-12 10:19:42 +01:00
"context"
2022-03-18 15:50:10 -04:00
"math/big"
2021-11-12 10:19:42 +01:00
"net"
"sync"
2021-11-12 10:19:42 +01:00
"testing"
"time"
2021-11-12 10:19:42 +01:00
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prometheus/client_golang/prometheus"
2021-11-12 10:19:42 +01:00
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
2021-11-12 10:19:42 +01:00
)
func createTestMsg(version uint32) *pb.WakuMessage {
message := new(pb.WakuMessage)
message.Payload = []byte{0, 1, 2}
message.Version = proto.Uint32(version)
message.Timestamp = proto.Int64(123456)
message.ContentTopic = "abc"
return message
}
2021-11-12 10:19:42 +01:00
func TestWakuNode2(t *testing.T) {
hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key, err := tests.RandomHex(32)
require.NoError(t, err)
prvKey, err := crypto.HexToECDSA(key)
require.NoError(t, err)
ctx := context.Background()
2023-01-06 18:37:57 -04:00
wakuNode, err := New(
2021-11-12 10:19:42 +01:00
WithPrivateKey(prvKey),
2021-11-17 12:19:42 -04:00
WithHostAddress(hostAddr),
2021-11-12 10:19:42 +01:00
WithWakuRelay(),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode.Start(ctx)
2021-11-12 10:19:42 +01:00
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
_, err = wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)
time.Sleep(time.Second * 1)
err = wakuNode.Relay().Unsubscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
defer wakuNode.Stop()
2021-11-12 10:19:42 +01:00
}
2022-03-18 15:50:10 -04:00
func int2Bytes(i int) []byte {
if i > 0 {
return append(big.NewInt(int64(i)).Bytes(), byte(1))
}
return append(big.NewInt(int64(i)).Bytes(), byte(0))
}
func TestUpAndDown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key1, _ := tests.RandomHex(32)
prvKey1, _ := crypto.HexToECDSA(key1)
nodes, err := dnsdisc.RetrieveNodes(context.Background(), "enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im")
require.NoError(t, err)
var bootnodes []*enode.Node
for _, n := range nodes {
if n.ENR != nil {
bootnodes = append(bootnodes, n.ENR)
}
}
wakuNode1, err := New(
WithPrivateKey(prvKey1),
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithDiscoveryV5(0, bootnodes, true),
)
require.NoError(t, err)
for i := 0; i < 5; i++ {
utils.Logger().Info("Starting...", zap.Int("iteration", i))
err = wakuNode1.Start(ctx)
require.NoError(t, err)
err = wakuNode1.DiscV5().Start(ctx)
require.NoError(t, err)
utils.Logger().Info("Started!", zap.Int("iteration", i))
time.Sleep(3 * time.Second)
utils.Logger().Info("Stopping...", zap.Int("iteration", i))
wakuNode1.Stop()
utils.Logger().Info("Stopped!", zap.Int("iteration", i))
}
}
func Test500(t *testing.T) {
maxMsgs := 500
2022-03-18 15:50:10 -04:00
maxMsgBytes := int2Bytes(maxMsgs)
2022-05-27 15:58:38 -04:00
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key1, _ := tests.RandomHex(32)
prvKey1, _ := crypto.HexToECDSA(key1)
hostAddr2, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key2, _ := tests.RandomHex(32)
prvKey2, _ := crypto.HexToECDSA(key2)
2023-01-06 18:37:57 -04:00
wakuNode1, err := New(
WithPrivateKey(prvKey1),
WithHostAddress(hostAddr1),
WithWakuRelay(),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()
2023-01-06 18:37:57 -04:00
wakuNode2, err := New(
WithPrivateKey(prvKey2),
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()
err = wakuNode2.DialPeer(ctx, wakuNode1.ListenAddresses()[0].String())
require.NoError(t, err)
time.Sleep(2 * time.Second)
sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
2022-11-25 16:54:11 -04:00
ticker := time.NewTimer(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
2022-03-18 15:50:10 -04:00
require.Fail(t, "Timeout Sub1")
case msg := <-sub1[0].Ch:
2022-12-21 14:56:58 -04:00
if msg == nil {
return
}
2022-03-18 15:50:10 -04:00
if bytes.Equal(msg.Message().Payload, maxMsgBytes) {
return
}
}
}
}()
go func() {
defer wg.Done()
2022-11-25 16:54:11 -04:00
ticker := time.NewTimer(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
2022-03-18 15:50:10 -04:00
require.Fail(t, "Timeout Sub2")
case msg := <-sub2[0].Ch:
2022-12-21 14:56:58 -04:00
if msg == nil {
return
}
2022-03-18 15:50:10 -04:00
if bytes.Equal(msg.Message().Payload, maxMsgBytes) {
return
}
}
}
}()
go func() {
defer wg.Done()
2022-03-18 15:50:10 -04:00
for i := 1; i <= maxMsgs; i++ {
msg := createTestMsg(0)
2022-03-18 15:50:10 -04:00
msg.Payload = int2Bytes(i)
msg.Timestamp = proto.Int64(int64(i))
if _, err := wakuNode2.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
require.Fail(t, "Could not publish all messages")
}
time.Sleep(5 * time.Millisecond)
}
}()
wg.Wait()
}
func TestDecoupledStoreFromRelay(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// NODE1: Relay Node + Filter Server
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithLegacyWakuFilter(true),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()
subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
defer subs[0].Unsubscribe()
// NODE2: Filter Client/Store
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
require.NoError(t, err)
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithLegacyWakuFilter(false),
WithWakuStore(),
WithMessageProvider(dbStore),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()
err = wakuNode2.DialPeerWithMultiAddress(ctx, wakuNode1.ListenAddresses()[0])
require.NoError(t, err)
time.Sleep(2 * time.Second)
_, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{
Topic: string(relay.DefaultWakuTopic),
ContentTopics: []string{"abc"},
}, legacy_filter.WithPeer(wakuNode1.host.ID()))
require.NoError(t, err)
// Sleep to make sure the filter is subscribed
time.Sleep(1 * time.Second)
// Send MSG1 on NODE1
msg := createTestMsg(0)
msg.Payload = []byte{1, 2, 3, 4, 5}
msg.Timestamp = utils.GetUnixEpoch()
var wg sync.WaitGroup
wg.Add(1)
go func() {
// MSG1 should be pushed in NODE2 via filter
defer wg.Done()
env, ok := <-filter.Chan
if !ok {
require.Fail(t, "no message")
}
require.Equal(t, msg.Timestamp, env.Message().Timestamp)
}()
time.Sleep(500 * time.Millisecond)
if _, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
require.Fail(t, "Could not publish all messages")
}
wg.Wait()
// NODE3: Query from NODE2
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
wakuNode3, err := New(
WithHostAddress(hostAddr3),
WithLegacyWakuFilter(false),
)
require.NoError(t, err)
2023-01-06 18:37:57 -04:00
err = wakuNode3.Start(ctx)
require.NoError(t, err)
defer wakuNode3.Stop()
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4)
require.NoError(t, err)
time.Sleep(2 * time.Second)
// NODE2 should have returned the message received via filter
result, err := wakuNode3.Store().Query(ctx, store.Query{})
require.NoError(t, err)
require.Len(t, result.Messages, 1)
require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
}