chore: adding relay test (#8)

This commit is contained in:
gabrielmer 2024-12-13 17:26:49 +01:00 committed by GitHub
parent 81b59bcbeb
commit 7eb7647599
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 103 additions and 9 deletions

View File

@ -424,6 +424,11 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
return w.node.Connect(ctx, address) return w.node.Connect(ctx, address)
} }
// TODO: change pubsub topic to shard notation everywhere
func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
return w.node.RelayPublish(ctx, message, pubsubTopic)
}
func (w *Waku) DialPeerByID(peerID peer.ID, protocol libp2pproto.ID) error { func (w *Waku) DialPeerByID(peerID peer.ID, protocol libp2pproto.ID) error {
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel() defer cancel()
@ -1241,3 +1246,7 @@ func getContextTimeoutMilliseconds(ctx context.Context) int {
} }
return 0 return 0
} }
func FormatWakuRelayTopic(clusterId uint16, shard uint16) string {
return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard)
}

View File

@ -4,15 +4,18 @@ import (
"context" "context"
"errors" "errors"
"slices" "slices"
"sync"
"testing" "testing"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/cenkalti/backoff/v3" "github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
) )
@ -190,7 +193,7 @@ func TestPeerExchange(t *testing.T) {
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
PeerExchange: false, PeerExchange: false,
Discv5UdpPort: 9001, Discv5UdpPort: 9010,
TcpPort: 60010, TcpPort: 60010,
} }
@ -214,7 +217,7 @@ func TestPeerExchange(t *testing.T) {
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
PeerExchange: true, PeerExchange: true,
Discv5UdpPort: 9000, Discv5UdpPort: 9011,
Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, Discv5BootstrapNodes: []string{discv5NodeEnr.String()},
TcpPort: 60011, TcpPort: 60011,
} }
@ -259,7 +262,7 @@ func TestPeerExchange(t *testing.T) {
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
PeerExchange: true, PeerExchange: true,
Discv5UdpPort: 9002, Discv5UdpPort: 9012,
TcpPort: 60012, TcpPort: 60012,
PeerExchangeNode: serverNodeMa[0].String(), PeerExchangeNode: serverNodeMa[0].String(),
} }
@ -318,8 +321,8 @@ func TestDnsDiscover(t *testing.T) {
LogLevel: "DEBUG", LogLevel: "DEBUG",
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
Discv5UdpPort: 9040, Discv5UdpPort: 9020,
TcpPort: 60040, TcpPort: 60020,
} }
node, err := New(&nodeWakuConfig, logger.Named("node")) node, err := New(&nodeWakuConfig, logger.Named("node"))
require.NoError(t, err) require.NoError(t, err)
@ -347,8 +350,8 @@ func TestDial(t *testing.T) {
Discv5Discovery: false, Discv5Discovery: false,
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
Discv5UdpPort: 9020, Discv5UdpPort: 9030,
TcpPort: 60020, TcpPort: 60030,
} }
dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode")) dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode"))
@ -363,8 +366,8 @@ func TestDial(t *testing.T) {
Discv5Discovery: false, Discv5Discovery: false,
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
Discv5UdpPort: 9021, Discv5UdpPort: 9031,
TcpPort: 60021, TcpPort: 60031,
} }
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err) require.NoError(t, err)
@ -395,3 +398,85 @@ func TestDial(t *testing.T) {
require.NoError(t, dialerNode.Stop()) require.NoError(t, dialerNode.Stop())
require.NoError(t, receiverNode.Stop()) require.NoError(t, receiverNode.Stop())
} }
func TestRelay(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
// start node that will send the message
senderNodeWakuConfig := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9040,
TcpPort: 60040,
}
senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode"))
require.NoError(t, err)
require.NoError(t, senderNode.Start())
time.Sleep(1 * time.Second)
// start node that will receive the message
receiverNodeWakuConfig := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9041,
TcpPort: 60041,
}
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)
// Dial so they become peers
err = senderNode.DialPeer(receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
senderPeerCount, err := senderNode.PeerCount()
require.NoError(t, err)
require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer")
receiverPeerCount, err := receiverNode.PeerCount()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
message := &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5, 6},
ContentTopic: "test-content-topic",
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}
// send message
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
senderNode.RelayPublish(ctx, message, pubsubTopic)
wg := sync.WaitGroup{}
wg.Add(1)
// Wait to receive message
select {
case envelope := <-receiverNode.node.MsgChan:
require.NotNil(t, envelope, "Envelope should be received")
require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match")
require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match")
wg.Done()
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No message received within 10 seconds")
}
wg.Wait()
// Stop nodes
require.NoError(t, senderNode.Stop())
require.NoError(t, receiverNode.Stop())
}