mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-11 10:23:10 +00:00
Merge pull request #33 from waku-org/Test_utils_P2
Adding helper functions & relay tests
This commit is contained in:
commit
2b0118ab08
68
waku/nodes_basic_test.go
Normal file
68
waku/nodes_basic_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
package waku
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBasicWakuNodes(t *testing.T) {
|
||||
Debug("Starting TestBasicWakuNodes")
|
||||
|
||||
nodeCfg := DefaultWakuConfig
|
||||
nodeCfg.Relay = true
|
||||
|
||||
Debug("Starting the WakuNode")
|
||||
node, err := StartWakuNode("node", &nodeCfg)
|
||||
require.NoError(t, err, "Failed to create the WakuNode")
|
||||
|
||||
// Use defer to ensure proper cleanup
|
||||
defer func() {
|
||||
node.StopAndDestroy()
|
||||
}()
|
||||
|
||||
Debug("Successfully created the WakuNode")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
Debug("TestBasicWakuNodes completed successfully")
|
||||
}
|
||||
|
||||
func TestNodeRestart(t *testing.T) {
|
||||
Debug("Starting TestNodeRestart")
|
||||
|
||||
Debug("Creating Node")
|
||||
nodeConfig := DefaultWakuConfig
|
||||
node, err := StartWakuNode("TestNode", &nodeConfig)
|
||||
require.NoError(t, err, "Failed to start Waku node")
|
||||
defer node.StopAndDestroy()
|
||||
|
||||
Debug("Node started successfully")
|
||||
|
||||
Debug("Fetching ENR before stopping the node")
|
||||
enrBefore, err := node.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR before stopping")
|
||||
require.NotEmpty(t, enrBefore, "ENR should not be empty before stopping")
|
||||
Debug("ENR before stopping: %s", enrBefore)
|
||||
|
||||
Debug("Stopping the Node")
|
||||
err = node.Stop()
|
||||
require.NoError(t, err, "Failed to stop Waku node")
|
||||
Debug("Node stopped successfully")
|
||||
|
||||
Debug("Restarting the Node")
|
||||
err = node.Start()
|
||||
require.NoError(t, err, "Failed to restart Waku node")
|
||||
Debug("Node restarted successfully")
|
||||
|
||||
Debug("Fetching ENR after restarting the node")
|
||||
enrAfter, err := node.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR after restarting")
|
||||
require.NotEmpty(t, enrAfter, "ENR should not be empty after restart")
|
||||
Debug("ENR after restarting: %s", enrAfter)
|
||||
|
||||
Debug("Comparing ENRs before and after restart")
|
||||
require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart")
|
||||
|
||||
Debug("TestNodeRestart completed successfully")
|
||||
}
|
||||
@ -617,11 +617,13 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
return errors.New(errMsg)
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
|
||||
return errors.New("error WakuRelaySubscribe: " + errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error {
|
||||
@ -657,7 +659,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk
|
||||
|
||||
func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
|
||||
if pubsubTopic == "" {
|
||||
return errors.New("pubsub topic is empty")
|
||||
err := errors.New("pubsub topic is empty")
|
||||
Error("Failed to unsubscribe from relay: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
@ -673,15 +677,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp)
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
|
||||
Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
return errors.New(errMsg)
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
|
||||
return errors.New("error WakuRelayUnsubscribe: " + errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
|
||||
@ -697,16 +705,20 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
|
||||
numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64)
|
||||
if err != nil {
|
||||
Error("Failed to parse number of received peers: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
return numRecvPeers, nil
|
||||
}
|
||||
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("PeerExchangeRequest failed: %v", errMsg)
|
||||
return 0, errors.New(errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) StartDiscV5() error {
|
||||
|
||||
Debug("Starting DiscV5 for node: %s", n.nodeName)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||
@ -716,9 +728,11 @@ func (n *WakuNode) StartDiscV5() error {
|
||||
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully started DiscV5 for node: %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -733,9 +747,11 @@ func (n *WakuNode) StopDiscV5() error {
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully stopped DiscV5 for node: %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -751,11 +767,13 @@ func (n *WakuNode) Version() (string, error) {
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version)
|
||||
return version, nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuVersion: " +
|
||||
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg)
|
||||
return "", errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -832,6 +850,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
|
||||
return common.MessageHash(""), errors.New(errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) {
|
||||
if n == nil {
|
||||
err := errors.New("cannot publish message; node is nil")
|
||||
Error("Failed to publish message via relay: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Handling context internally with a timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
Debug("Attempting to publish message via relay on node %s", n.nodeName)
|
||||
|
||||
msgHash, err := n.RelayPublish(ctx, message, pubsubTopic)
|
||||
if err != nil {
|
||||
Error("Failed to publish message via relay on node %s: %v", n.nodeName, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String())
|
||||
return msgHash, nil
|
||||
}
|
||||
|
||||
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
@ -956,7 +997,7 @@ func (n *WakuNode) Destroy() error {
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully destroyed " + n.nodeName)
|
||||
Debug("Successfully destroyed %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1284,6 +1325,19 @@ func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, er
|
||||
nodeCfg = *customCfg
|
||||
}
|
||||
|
||||
tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort)
|
||||
if err != nil {
|
||||
Error("Failed to allocate unique ports: %v", err)
|
||||
tcpPort, udpPort = 0, 0
|
||||
}
|
||||
|
||||
if nodeCfg.TcpPort == 0 {
|
||||
nodeCfg.TcpPort = tcpPort
|
||||
}
|
||||
if nodeCfg.Discv5UdpPort == 0 {
|
||||
nodeCfg.Discv5UdpPort = udpPort
|
||||
}
|
||||
|
||||
Debug("Creating %s", nodeName)
|
||||
node, err := NewWakuNode(&nodeCfg, nodeName)
|
||||
if err != nil {
|
||||
@ -1302,6 +1356,7 @@ func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, er
|
||||
}
|
||||
|
||||
func (n *WakuNode) StopAndDestroy() error {
|
||||
Debug("Stopping and destroying Node")
|
||||
if n == nil {
|
||||
err := errors.New("waku node is nil")
|
||||
Error("Failed to stop and destroy: %v", err)
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
package waku
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -10,6 +12,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v3"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/waku-go-bindings/waku/common"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type NwakuInfo struct {
|
||||
@ -77,3 +82,136 @@ func RetryWithBackOff(o func() error, options ...BackOffOption) error {
|
||||
b.Reset()
|
||||
return backoff.Retry(o, &b)
|
||||
}
|
||||
|
||||
func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage {
|
||||
Debug("Creating a WakuMessage on node %s", n.nodeName)
|
||||
|
||||
if len(customMessage) > 0 && customMessage[0] != nil {
|
||||
Debug("Using provided custom message on node %s", n.nodeName)
|
||||
return customMessage[0]
|
||||
}
|
||||
|
||||
Debug("Using default message format on node %s", n.nodeName)
|
||||
defaultMessage := &pb.WakuMessage{
|
||||
Payload: []byte("This is a default Waku message payload"),
|
||||
ContentTopic: "test-content-topic",
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
Debug("Successfully created a default WakuMessage on node %s", n.nodeName)
|
||||
return defaultMessage
|
||||
}
|
||||
|
||||
func WaitForAutoConnection(nodeList []*WakuNode) error {
|
||||
Debug("Waiting for auto-connection of nodes...")
|
||||
|
||||
options := func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 30 * time.Second
|
||||
}
|
||||
|
||||
err := RetryWithBackOff(func() error {
|
||||
for _, node := range nodeList {
|
||||
peers, err := node.GetConnectedPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(peers) < 1 {
|
||||
return errors.New("expected at least one connected peer") // Retry
|
||||
}
|
||||
|
||||
Debug("Node %s has %d connected peers", node.nodeName, len(peers))
|
||||
}
|
||||
|
||||
return nil
|
||||
}, options)
|
||||
|
||||
if err != nil {
|
||||
Error("Auto-connection failed after retries: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
Debug("Auto-connection check completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash, timeout ...time.Duration) error {
|
||||
|
||||
var verifyTimeout time.Duration
|
||||
if len(timeout) > 0 {
|
||||
verifyTimeout = timeout[0]
|
||||
} else {
|
||||
verifyTimeout = DefaultTimeOut
|
||||
}
|
||||
|
||||
Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, verifyTimeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), verifyTimeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case envelope := <-n.MsgChan:
|
||||
if envelope == nil {
|
||||
Error("Received envelope is nil on node %s", n.nodeName)
|
||||
return errors.New("received envelope is nil")
|
||||
}
|
||||
if string(expectedMessage.Payload) != string(envelope.Message().Payload) {
|
||||
Error("Payload does not match on node %s", n.nodeName)
|
||||
return errors.New("payload does not match")
|
||||
}
|
||||
if expectedMessage.ContentTopic != envelope.Message().ContentTopic {
|
||||
Error("Content topic does not match on node %s", n.nodeName)
|
||||
return errors.New("content topic does not match")
|
||||
}
|
||||
if expectedHash != envelope.Hash() {
|
||||
Error("Message hash does not match on node %s", n.nodeName)
|
||||
return errors.New("message hash does not match")
|
||||
}
|
||||
Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload))
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
Error("Timeout: message not received within %v on node %s", verifyTimeout, n.nodeName)
|
||||
return errors.New("timeout: message not received within the given duration")
|
||||
}
|
||||
}
|
||||
|
||||
func ConnectAllPeers(nodes []*WakuNode) error {
|
||||
if len(nodes) == 0 {
|
||||
Error("Cannot connect peers: node list is empty")
|
||||
return errors.New("node list is empty")
|
||||
}
|
||||
|
||||
timeout := time.Duration(len(nodes)*2) * time.Second
|
||||
Debug("Connecting nodes in a relay chain with timeout: %v", timeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
for i := 0; i < len(nodes)-1; i++ {
|
||||
Debug("Connecting node %d to node %d", i, i+1)
|
||||
err := nodes[i].ConnectPeer(nodes[i+1])
|
||||
if err != nil {
|
||||
Error("Failed to connect node %d to node %d: %v", i, i+1, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
Debug("Connections stabilized")
|
||||
return nil
|
||||
}
|
||||
|
||||
func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error {
|
||||
for _, node := range nodes {
|
||||
Debug("Subscribing node %s to topic %s", node.nodeName, topic)
|
||||
err := node.RelaySubscribe(topic)
|
||||
|
||||
if err != nil {
|
||||
Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err)
|
||||
return err
|
||||
}
|
||||
Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
227
waku/peer_connections_test.go
Normal file
227
waku/peer_connections_test.go
Normal file
@ -0,0 +1,227 @@
|
||||
package waku
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Test node connect & disconnect peers
|
||||
func TestDisconnectPeerNodes(t *testing.T) {
|
||||
Debug("Starting TestDisconnectPeerNodes")
|
||||
|
||||
nodeA, err := StartWakuNode("nodeA", nil)
|
||||
require.NoError(t, err, "Failed to start Node A")
|
||||
defer nodeA.StopAndDestroy()
|
||||
|
||||
nodeB, err := StartWakuNode("nodeB", nil)
|
||||
require.NoError(t, err, "Failed to start Node B")
|
||||
defer nodeB.StopAndDestroy()
|
||||
|
||||
Debug("Connecting Node A to Node B")
|
||||
err = nodeA.ConnectPeer(nodeB)
|
||||
require.NoError(t, err, "Failed to connect nodes")
|
||||
|
||||
Debug("Verifying connection between Node A and Node B")
|
||||
connectedPeers, err := nodeA.GetConnectedPeers()
|
||||
require.NoError(t, err, "Failed to get connected peers for Node A")
|
||||
nodeBPeerID, err := nodeB.PeerID()
|
||||
require.NoError(t, err, "Failed to get PeerID for Node B")
|
||||
require.True(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should be a peer of Node A before disconnection")
|
||||
|
||||
Debug("Disconnecting Node A from Node B")
|
||||
err = nodeA.DisconnectPeer(nodeB)
|
||||
require.NoError(t, err, "Failed to disconnect nodes")
|
||||
|
||||
Debug("Verifying disconnection between Node A and Node B")
|
||||
connectedPeers, err = nodeA.GetConnectedPeers()
|
||||
require.NoError(t, err, "Failed to get connected peers for Node A after disconnection")
|
||||
require.False(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should no longer be a peer of Node A after disconnection")
|
||||
Debug("Test completed successfully: Node B was disconnected from Node A")
|
||||
}
|
||||
|
||||
func TestConnectMultipleNodesToSingleNode(t *testing.T) {
|
||||
Debug("Starting TestConnectMultipleNodesToSingleNode")
|
||||
|
||||
Debug("Creating 3 nodes with automatically assigned ports")
|
||||
|
||||
node1, err := StartWakuNode("node1", nil)
|
||||
require.NoError(t, err, "Failed to start Node 1")
|
||||
defer func() {
|
||||
Debug("Stopping and destroying Node 1")
|
||||
node1.StopAndDestroy()
|
||||
}()
|
||||
|
||||
node2, err := StartWakuNode("node2", nil)
|
||||
require.NoError(t, err, "Failed to start Node 2")
|
||||
defer func() {
|
||||
Debug("Stopping and destroying Node 2")
|
||||
node2.StopAndDestroy()
|
||||
}()
|
||||
|
||||
node3, err := StartWakuNode("node3", nil)
|
||||
require.NoError(t, err, "Failed to start Node 3")
|
||||
defer func() {
|
||||
Debug("Stopping and destroying Node 3")
|
||||
node3.StopAndDestroy()
|
||||
}()
|
||||
|
||||
Debug("Connecting Node 2 to Node 1")
|
||||
err = node2.ConnectPeer(node1)
|
||||
require.NoError(t, err, "Failed to connect Node 2 to Node 1")
|
||||
|
||||
Debug("Connecting Node 3 to Node 1")
|
||||
err = node3.ConnectPeer(node1)
|
||||
require.NoError(t, err, "Failed to connect Node 3 to Node 1")
|
||||
|
||||
Debug("Verifying connected peers for Node 1")
|
||||
connectedPeers, err := node1.GetConnectedPeers()
|
||||
require.NoError(t, err, "Failed to get connected peers for Node 1")
|
||||
node3PeerID, err := node3.PeerID()
|
||||
require.NoError(t, err, "Failed to get PeerID for Node 1")
|
||||
node2PeerID, err := node2.PeerID()
|
||||
require.NoError(t, err, "Failed to get PeerID for Node 2")
|
||||
|
||||
require.True(t, slices.Contains(connectedPeers, node3PeerID), "Node 3 should be a peer of Node 1")
|
||||
require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 1")
|
||||
|
||||
Debug("Test completed successfully: multiple nodes connected to a single node and verified peers")
|
||||
}
|
||||
|
||||
func TestDiscv5PeerMeshCount(t *testing.T) {
|
||||
Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
Debug("Creating Node1")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
enrNode1, err := node1.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Discv5BootstrapNodes = []string{enrNode1.String()}
|
||||
node2Config.Relay = true
|
||||
Debug("Creating Node2 with Node1 as Discv5 bootstrap")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
require.NoError(t, err, "Failed to get ENR for Node2")
|
||||
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Discv5BootstrapNodes = []string{enrNode1.String()}
|
||||
node3Config.Relay = true
|
||||
|
||||
Debug("Creating Node3 with Node2 as Discv5 bootstrap")
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err, "Failed to start Node3")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
}()
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic)
|
||||
|
||||
err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic)
|
||||
require.NoError(t, err, "Failed to subscribe all nodes to the topic")
|
||||
|
||||
Debug("Waiting for nodes to auto-connect via Discv5")
|
||||
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
Debug("Fetching number of peers in mesh for Node1 before stopping Node3")
|
||||
peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic)
|
||||
require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3")
|
||||
|
||||
Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore)
|
||||
require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3")
|
||||
|
||||
Debug("Stopping Node3")
|
||||
node3.StopAndDestroy()
|
||||
|
||||
Debug("Waiting for network update after Node3 stops")
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
Debug("Fetching number of peers in mesh for Node1 after stopping Node3")
|
||||
peerCountAfter, err := node1.GetNumPeersInMesh(defaultPubsubTopic)
|
||||
require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3")
|
||||
|
||||
Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter)
|
||||
require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3")
|
||||
|
||||
Debug("Test successfully verified peer count change after stopping Node3")
|
||||
}
|
||||
|
||||
// this test commented as it will fail will be changed to have external ip in future task
|
||||
/*
|
||||
func TestDiscv5GetPeersConnected(t *testing.T) {
|
||||
Debug("Starting test to verify peer count in mesh with 4 nodes using Discv5 (Chained Connection)")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
|
||||
Debug("Creating Node1")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
enrNode1, err := node1.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
node2Config.Discv5BootstrapNodes = []string{enrNode1.String()}
|
||||
|
||||
Debug("Creating Node2 with Node1 as Discv5 bootstrap")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
enrNode2, err := node2.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node2")
|
||||
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Relay = true
|
||||
node3Config.Discv5BootstrapNodes = []string{enrNode2.String()}
|
||||
|
||||
Debug("Creating Node3 with Node2 as Discv5 bootstrap")
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err, "Failed to start Node3")
|
||||
|
||||
enrNode3, err := node3.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node3")
|
||||
|
||||
node4Config := DefaultWakuConfig
|
||||
node4Config.Relay = true
|
||||
node4Config.Discv5BootstrapNodes = []string{enrNode3.String()}
|
||||
|
||||
Debug("Creating Node4 with Node3 as Discv5 bootstrap")
|
||||
node4, err := StartWakuNode("Node4", &node4Config)
|
||||
require.NoError(t, err, "Failed to start Node4")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
node3.StopAndDestroy()
|
||||
node4.StopAndDestroy()
|
||||
}()
|
||||
|
||||
Debug("Waiting for nodes to auto-connect via Discv5")
|
||||
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
Debug("Fetching number of peers in connected to Node1")
|
||||
peerCount, err := node1.GetNumConnectedPeers()
|
||||
require.NoError(t, err, "Failed to get number of peers in mesh for Node1")
|
||||
|
||||
Debug("Total number of peers connected to Node1: %d", peerCount)
|
||||
require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh")
|
||||
|
||||
Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)")
|
||||
}
|
||||
*/
|
||||
509
waku/relay_test.go
Normal file
509
waku/relay_test.go
Normal file
@ -0,0 +1,509 @@
|
||||
package waku
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/waku-go-bindings/waku/common"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestVerifyNumConnectedRelayPeers(t *testing.T) {
|
||||
|
||||
node1Cfg := DefaultWakuConfig
|
||||
node1Cfg.Relay = true
|
||||
node1, err := StartWakuNode("node1", &node1Cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start node1: %v", err)
|
||||
}
|
||||
node2Cfg := DefaultWakuConfig
|
||||
node2Cfg.Relay = true
|
||||
node2, err := StartWakuNode("node2", &node2Cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start node2: %v", err)
|
||||
}
|
||||
|
||||
node3, err := StartWakuNode("node3", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start node3: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
node3.StopAndDestroy()
|
||||
}()
|
||||
|
||||
err = node2.ConnectPeer(node1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect node2 to node1: %v", err)
|
||||
}
|
||||
|
||||
err = node3.ConnectPeer(node1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect node3 to node1: %v", err)
|
||||
}
|
||||
connectedPeersNode1, err := node1.GetConnectedPeers()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connected peers for node1: %v", err)
|
||||
}
|
||||
if len(connectedPeersNode1) != 2 {
|
||||
t.Fatalf("Expected 2 connected peers on node1, but got %d", len(connectedPeersNode1))
|
||||
}
|
||||
|
||||
numRelayPeers, err := node1.GetNumConnectedRelayPeers()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connected relay peers for node1: %v", err)
|
||||
}
|
||||
if numRelayPeers != 1 {
|
||||
t.Fatalf("Expected 1 relay peer on node1, but got %d", numRelayPeers)
|
||||
}
|
||||
|
||||
t.Logf("Successfully connected node2 and node3 to node1. Relay Peers: %d, Total Peers: %d", numRelayPeers, len(connectedPeersNode1))
|
||||
}
|
||||
|
||||
func TestRelayMessageTransmission(t *testing.T) {
|
||||
Debug("Starting TestRelayMessageTransmission")
|
||||
|
||||
Debug("Creating Sender Node with Relay enabled")
|
||||
senderConfig := DefaultWakuConfig
|
||||
senderConfig.Relay = true
|
||||
|
||||
senderNode, err := StartWakuNode("SenderNode", &senderConfig)
|
||||
require.NoError(t, err, "Failed to start SenderNode")
|
||||
defer senderNode.StopAndDestroy()
|
||||
|
||||
Debug("Creating Receiver Node with Relay enabled")
|
||||
receiverConfig := DefaultWakuConfig
|
||||
receiverConfig.Relay = true
|
||||
|
||||
// Set the Receiver Node's discovery bootstrap node as SenderNode
|
||||
enrSender, err := senderNode.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for SenderNode")
|
||||
receiverConfig.Discv5BootstrapNodes = []string{enrSender.String()}
|
||||
|
||||
receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig)
|
||||
require.NoError(t, err, "Failed to start ReceiverNode")
|
||||
defer receiverNode.StopAndDestroy()
|
||||
|
||||
Debug("Waiting for nodes to auto-connect via Discv5")
|
||||
err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
Debug("Creating and publishing message")
|
||||
message := senderNode.CreateMessage()
|
||||
var msgHash string
|
||||
|
||||
err = RetryWithBackOff(func() error {
|
||||
var err error
|
||||
msgHashObj, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message)
|
||||
if err == nil {
|
||||
msgHash = msgHashObj.String()
|
||||
}
|
||||
return err
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msgHash)
|
||||
|
||||
Debug("Verifying message reception")
|
||||
err = RetryWithBackOff(func() error {
|
||||
msgHashObj, _ := common.ToMessageHash(msgHash)
|
||||
return receiverNode.VerifyMessageReceived(message, msgHashObj)
|
||||
})
|
||||
require.NoError(t, err, "Message verification failed")
|
||||
|
||||
Debug("TestRelayMessageTransmission completed successfully")
|
||||
}
|
||||
|
||||
func TestRelayMessageBroadcast(t *testing.T) {
|
||||
Debug("Starting TestRelayMessageBroadcast")
|
||||
|
||||
numPeers := 5
|
||||
nodes := make([]*WakuNode, numPeers)
|
||||
nodeNames := []string{"SenderNode", "PeerNode1", "PeerNode2", "PeerNode3", "PeerNode4"}
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
|
||||
for i := 0; i < numPeers; i++ {
|
||||
Debug("Creating node %s", nodeNames[i])
|
||||
nodeConfig := DefaultWakuConfig
|
||||
nodeConfig.Relay = true
|
||||
if i > 0 {
|
||||
enrPrevNode, err := nodes[i-1].ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for node %s", nodeNames[i-1])
|
||||
nodeConfig.Discv5BootstrapNodes = []string{enrPrevNode.String()}
|
||||
}
|
||||
node, err := StartWakuNode(nodeNames[i], &nodeConfig)
|
||||
require.NoError(t, err)
|
||||
defer node.StopAndDestroy()
|
||||
nodes[i] = node
|
||||
}
|
||||
|
||||
WaitForAutoConnection(nodes)
|
||||
|
||||
senderNode := nodes[0]
|
||||
Debug("SenderNode is publishing a message")
|
||||
message := senderNode.CreateMessage()
|
||||
msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msgHash)
|
||||
|
||||
Debug("Waiting to ensure message delivery")
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
Debug("Verifying message reception for each node")
|
||||
for i, node := range nodes {
|
||||
Debug("Verifying message for node %s", nodeNames[i])
|
||||
err := node.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "message verification failed for node: %s", nodeNames[i])
|
||||
}
|
||||
|
||||
Debug("TestRelayMessageBroadcast completed successfully")
|
||||
}
|
||||
|
||||
func TestSendmsgInvalidPayload(t *testing.T) {
|
||||
Debug("Starting TestInvalidMessageFormat")
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
|
||||
Debug("Creating nodes")
|
||||
senderNodeConfig := DefaultWakuConfig
|
||||
senderNodeConfig.Relay = true
|
||||
senderNode, err := StartWakuNode("SenderNode", &senderNodeConfig)
|
||||
require.NoError(t, err)
|
||||
defer senderNode.StopAndDestroy()
|
||||
|
||||
receiverNodeConfig := DefaultWakuConfig
|
||||
receiverNodeConfig.Relay = true
|
||||
enrNode2, err := senderNode.ENR()
|
||||
if err != nil {
|
||||
require.Error(t, err, "Can't find node ENR")
|
||||
}
|
||||
receiverNodeConfig.Discv5BootstrapNodes = []string{enrNode2.String()}
|
||||
receiverNode, err := StartWakuNode("receiverNode", &receiverNodeConfig)
|
||||
require.NoError(t, err)
|
||||
defer receiverNode.StopAndDestroy()
|
||||
|
||||
err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
Debug("SenderNode is publishing an invalid message")
|
||||
invalidMessage := &pb.WakuMessage{
|
||||
Payload: []byte{},
|
||||
ContentTopic: "test-content-topic",
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
message := senderNode.CreateMessage(invalidMessage)
|
||||
var msgHash common.MessageHash
|
||||
msgHash, err = senderNode.RelayPublishNoCTX(defaultPubsubTopic, message)
|
||||
|
||||
Debug("Verifying if message was sent or failed")
|
||||
if err != nil {
|
||||
Debug("Message was not sent due to invalid format: %v", err)
|
||||
|
||||
} else {
|
||||
Debug("Message was unexpectedly sent: %s", msgHash.String())
|
||||
require.Fail(t, "message with invalid format should not be sent")
|
||||
}
|
||||
|
||||
Debug("TestInvalidMessageFormat completed")
|
||||
}
|
||||
|
||||
func TestRelayNodesNotConnectedDirectly(t *testing.T) {
|
||||
Debug("Starting TestRelayNodesNotConnectedDirectly")
|
||||
|
||||
Debug("Creating Sender Node with Relay enabled")
|
||||
senderConfig := DefaultWakuConfig
|
||||
senderConfig.Relay = true
|
||||
senderNode, err := StartWakuNode("SenderNode", &senderConfig)
|
||||
require.NoError(t, err)
|
||||
defer senderNode.StopAndDestroy()
|
||||
|
||||
Debug("Creating Relay-Enabled Receiver Node (Node2)")
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
|
||||
// Use static nodes instead of ENR
|
||||
node1Address, err := senderNode.ListenAddresses()
|
||||
require.NoError(t, err, "Failed to get sender node address")
|
||||
node2Config.Staticnodes = []string{node1Address[0].String()}
|
||||
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err)
|
||||
defer node2.StopAndDestroy()
|
||||
|
||||
Debug("Creating Relay-Enabled Receiver Node (Node3)")
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Relay = true
|
||||
|
||||
// Use static nodes instead of Discv5
|
||||
node2Address, err := node2.ListenAddresses()
|
||||
require.NoError(t, err, "Failed to get node2 address")
|
||||
node3Config.Staticnodes = []string{node2Address[0].String()}
|
||||
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err)
|
||||
defer node3.StopAndDestroy()
|
||||
|
||||
Debug("Waiting for nodes to connect before proceeding")
|
||||
err = WaitForAutoConnection([]*WakuNode{senderNode, node2, node3})
|
||||
require.NoError(t, err, "Nodes did not connect within timeout")
|
||||
|
||||
Debug("SenderNode is publishing a message")
|
||||
message := senderNode.CreateMessage()
|
||||
msgHash, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msgHash)
|
||||
|
||||
Debug("Verifying that Node2 received the message")
|
||||
err = node2.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "Node2 should have received the message")
|
||||
|
||||
Debug("Verifying that Node3 received the message")
|
||||
err = node3.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "Node3 should have received the message")
|
||||
|
||||
Debug("TestRelayNodesNotConnectedDirectly completed successfully")
|
||||
}
|
||||
|
||||
func TestRelaySubscribeAndPeerCountChange(t *testing.T) {
|
||||
Debug("Starting test to verify relay subscription and peer count change after stopping a node")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
|
||||
Debug("Creating Node1 with Relay enabled")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
node1Address, err := node1.ListenAddresses()
|
||||
require.NoError(t, err, "Failed to get listening address for Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
node2Config.Staticnodes = []string{node1Address[0].String()}
|
||||
|
||||
Debug("Creating Node2 with Node1 as a static node")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
node2Address, err := node2.ListenAddresses()
|
||||
require.NoError(t, err, "Failed to get listening address for Node2")
|
||||
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Relay = true
|
||||
node3Config.Staticnodes = []string{node2Address[0].String()}
|
||||
|
||||
Debug("Creating Node3 with Node2 as a static node")
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err, "Failed to start Node3")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
}()
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic)
|
||||
|
||||
Debug("Waiting for nodes to connect via static node configuration")
|
||||
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3})
|
||||
require.NoError(t, err, "Nodes did not connect within timeout")
|
||||
|
||||
Debug("Waiting for peer connections to stabilize")
|
||||
options := func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 10 * time.Second // Only set the max wait time
|
||||
}
|
||||
require.NoError(t, RetryWithBackOff(func() error {
|
||||
numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numPeers != 2 {
|
||||
return fmt.Errorf("expected 2 relay peers, got %d", numPeers)
|
||||
}
|
||||
return nil
|
||||
}, options), "Peers did not stabilize in time")
|
||||
|
||||
Debug("Stopping Node3")
|
||||
node3.StopAndDestroy()
|
||||
|
||||
Debug("Waiting for network to update after Node3 stops")
|
||||
require.NoError(t, RetryWithBackOff(func() error {
|
||||
numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numPeers != 1 {
|
||||
return fmt.Errorf("expected 1 relay peer after stopping Node3, got %d", numPeers)
|
||||
}
|
||||
return nil
|
||||
}, options), "Peer count did not update after stopping Node3")
|
||||
|
||||
Debug("Test successfully verified peer count changes as expected after stopping Node3")
|
||||
}
|
||||
|
||||
func TestRelaySubscribeFailsWhenRelayDisabled(t *testing.T) {
|
||||
Debug("Starting test to verify that subscribing to a topic fails when Relay is disabled")
|
||||
|
||||
nodeConfig := DefaultWakuConfig
|
||||
nodeConfig.Relay = false
|
||||
|
||||
Debug("Creating Node with Relay disabled")
|
||||
node, err := StartWakuNode("TestNode", &nodeConfig)
|
||||
require.NoError(t, err, "Failed to start Node")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying the Waku node")
|
||||
node.StopAndDestroy()
|
||||
}()
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
Debug("Attempting to subscribe to the default pubsub topic: %s", defaultPubsubTopic)
|
||||
|
||||
err = node.RelaySubscribe(defaultPubsubTopic)
|
||||
|
||||
Debug("Verifying that subscription failed")
|
||||
require.Error(t, err, "Expected RelaySubscribe to return an error when Relay is disabled")
|
||||
|
||||
Debug("Test successfully verified that RelaySubscribe fails when Relay is disabled")
|
||||
}
|
||||
|
||||
func TestRelayDisabledNodeDoesNotReceiveMessages(t *testing.T) {
|
||||
Debug("Starting test to verify that a node with Relay disabled does not receive messages")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
|
||||
Debug("Creating Node1 with Relay enabled")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
enrNode1, err := node1.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
node2Config.Discv5BootstrapNodes = []string{enrNode1.String()}
|
||||
|
||||
Debug("Creating Node2 with Node1 as Discv5 bootstrap")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
enrNode2, err := node2.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node2")
|
||||
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Relay = false
|
||||
node3Config.Discv5BootstrapNodes = []string{enrNode2.String()}
|
||||
|
||||
Debug("Creating Node3 with Node2 as Discv5 bootstrap")
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err, "Failed to start Node3")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
node3.StopAndDestroy()
|
||||
}()
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic)
|
||||
|
||||
err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic)
|
||||
require.NoError(t, err, "Failed to subscribe nodes to the topic")
|
||||
|
||||
Debug("Waiting for nodes to auto-connect via Discv5")
|
||||
err = WaitForAutoConnection([]*WakuNode{node1, node2})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
Debug("Creating and publishing message from Node1")
|
||||
message := node1.CreateMessage()
|
||||
msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message)
|
||||
require.NoError(t, err, "Failed to publish message from Node1")
|
||||
|
||||
Debug("Waiting to ensure message delivery")
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
Debug("Verifying that Node2 received the message")
|
||||
err = node2.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "Node2 should have received the message")
|
||||
|
||||
Debug("Verifying that Node3 did NOT receive the message")
|
||||
err = node3.VerifyMessageReceived(message, msgHash)
|
||||
require.Error(t, err, "Node3 should NOT have received the message")
|
||||
|
||||
Debug("Test successfully verified that Node3 did not receive the message")
|
||||
}
|
||||
|
||||
func TestPublishWithLargePayload(t *testing.T) {
|
||||
Debug("Starting test to verify message publishing with a payload close to 150KB")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
|
||||
Debug("Creating Node1 with Relay enabled")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
enrNode1, err := node1.ENR()
|
||||
require.NoError(t, err, "Failed to get ENR for Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
node2Config.Discv5BootstrapNodes = []string{enrNode1.String()}
|
||||
|
||||
Debug("Creating Node2 with Node1 as Discv5 bootstrap")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
}()
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic)
|
||||
|
||||
err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic)
|
||||
require.NoError(t, err, "Failed to subscribe nodes to the topic")
|
||||
|
||||
Debug("Waiting for nodes to auto-connect via Discv5")
|
||||
err = WaitForAutoConnection([]*WakuNode{node1, node2})
|
||||
require.NoError(t, err, "Nodes did not auto-connect within timeout")
|
||||
|
||||
payloadLength := 1024 * 100 // 100KB raw, approximately 150KB when base64 encoded
|
||||
Debug("Generating a large payload of %d bytes", payloadLength)
|
||||
|
||||
largePayload := make([]byte, payloadLength)
|
||||
for i := range largePayload {
|
||||
largePayload[i] = 'a'
|
||||
}
|
||||
|
||||
message := node1.CreateMessage(&pb.WakuMessage{
|
||||
Payload: largePayload,
|
||||
ContentTopic: "test-content-topic",
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
})
|
||||
|
||||
Debug("Publishing message from Node1 with large payload")
|
||||
msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message)
|
||||
require.NoError(t, err, "Failed to publish message from Node1")
|
||||
|
||||
Debug("Waiting to ensure message propagation")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
Debug("Verifying that Node2 received the message")
|
||||
err = node2.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "Node2 should have received the message")
|
||||
|
||||
Debug("Test successfully verified message publishing with a large payload")
|
||||
}
|
||||
@ -10,14 +10,7 @@ var DefaultWakuConfig common.WakuConfig
|
||||
|
||||
func init() {
|
||||
|
||||
udpPort, _, err1 := GetFreePortIfNeeded(0, 0)
|
||||
tcpPort, _, err2 := GetFreePortIfNeeded(0, 0)
|
||||
|
||||
if err1 != nil || err2 != nil {
|
||||
Error("Failed to get free ports %v %v", err1, err2)
|
||||
}
|
||||
|
||||
DefaultWakuConfig = common.WakuConfig{
|
||||
DefaultWakuConfig = WakuConfig{
|
||||
Relay: false,
|
||||
LogLevel: "DEBUG",
|
||||
Discv5Discovery: true,
|
||||
@ -27,12 +20,13 @@ func init() {
|
||||
Store: false,
|
||||
Filter: false,
|
||||
Lightpush: false,
|
||||
Discv5UdpPort: udpPort,
|
||||
TcpPort: tcpPort,
|
||||
Discv5UdpPort: 0,
|
||||
TcpPort: 0,
|
||||
}
|
||||
}
|
||||
|
||||
const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
|
||||
const DefaultTimeOut = 3 * time.Second
|
||||
|
||||
var DefaultPubsubTopic = "/waku/2/rs/16/64"
|
||||
var (
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user