diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go new file mode 100644 index 0000000..43489c8 --- /dev/null +++ b/waku/nodes_basic_test.go @@ -0,0 +1,68 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBasicWakuNodes(t *testing.T) { + Debug("Starting TestBasicWakuNodes") + + nodeCfg := DefaultWakuConfig + nodeCfg.Relay = true + + Debug("Starting the WakuNode") + node, err := StartWakuNode("node", &nodeCfg) + require.NoError(t, err, "Failed to create the WakuNode") + + // Use defer to ensure proper cleanup + defer func() { + node.StopAndDestroy() + }() + + Debug("Successfully created the WakuNode") + time.Sleep(2 * time.Second) + + Debug("TestBasicWakuNodes completed successfully") +} + +func TestNodeRestart(t *testing.T) { + Debug("Starting TestNodeRestart") + + Debug("Creating Node") + nodeConfig := DefaultWakuConfig + node, err := StartWakuNode("TestNode", &nodeConfig) + require.NoError(t, err, "Failed to start Waku node") + defer node.StopAndDestroy() + + Debug("Node started successfully") + + Debug("Fetching ENR before stopping the node") + enrBefore, err := node.ENR() + require.NoError(t, err, "Failed to get ENR before stopping") + require.NotEmpty(t, enrBefore, "ENR should not be empty before stopping") + Debug("ENR before stopping: %s", enrBefore) + + Debug("Stopping the Node") + err = node.Stop() + require.NoError(t, err, "Failed to stop Waku node") + Debug("Node stopped successfully") + + Debug("Restarting the Node") + err = node.Start() + require.NoError(t, err, "Failed to restart Waku node") + Debug("Node restarted successfully") + + Debug("Fetching ENR after restarting the node") + enrAfter, err := node.ENR() + require.NoError(t, err, "Failed to get ENR after restarting") + require.NotEmpty(t, enrAfter, "ENR should not be empty after restart") + Debug("ENR after restarting: %s", enrAfter) + + Debug("Comparing ENRs before and after restart") + require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart") + + Debug("TestNodeRestart completed successfully") +} diff --git a/waku/nwaku.go b/waku/nwaku.go index f2cdd9a..03133da 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -617,11 +617,13 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelaySubscribe: " + errMsg) } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -657,7 +659,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to unsubscribe from relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -673,15 +677,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { } wg.Add(1) + Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelayUnsubscribe: " + errMsg) } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { @@ -697,16 +705,20 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) if err != nil { + Error("Failed to parse number of received peers: %v", err) return 0, err } return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("PeerExchangeRequest failed: %v", errMsg) return 0, errors.New(errMsg) } func (n *WakuNode) StartDiscV5() error { + + Debug("Starting DiscV5 for node: %s", n.nodeName) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -716,9 +728,11 @@ func (n *WakuNode) StartDiscV5() error { C.cGoWakuStartDiscV5(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully started DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -733,9 +747,11 @@ func (n *WakuNode) StopDiscV5() error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -751,11 +767,13 @@ func (n *WakuNode) Version() (string, error) { if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) return version, nil } errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) return "", errors.New(errMsg) } @@ -832,6 +850,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), errors.New(errMsg) } +func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { + if n == nil { + err := errors.New("cannot publish message; node is nil") + Error("Failed to publish message via relay: %v", err) + return "", err + } + + // Handling context internally with a timeout + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + + Debug("Attempting to publish message via relay on node %s", n.nodeName) + + msgHash, err := n.RelayPublish(ctx, message, pubsubTopic) + if err != nil { + Error("Failed to publish message via relay on node %s: %v", n.nodeName, err) + return "", err + } + + Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String()) + return msgHash, nil +} + func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} @@ -956,7 +997,7 @@ func (n *WakuNode) Destroy() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully destroyed " + n.nodeName) + Debug("Successfully destroyed %s", n.nodeName) return nil } @@ -1284,6 +1325,19 @@ func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, er nodeCfg = *customCfg } + tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 + } + + if nodeCfg.TcpPort == 0 { + nodeCfg.TcpPort = tcpPort + } + if nodeCfg.Discv5UdpPort == 0 { + nodeCfg.Discv5UdpPort = udpPort + } + Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) if err != nil { @@ -1302,6 +1356,7 @@ func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, er } func (n *WakuNode) StopAndDestroy() error { + Debug("Stopping and destroying Node") if n == nil { err := errors.New("waku node is nil") Error("Failed to stop and destroy: %v", err) diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index bb923fe..5b1bc96 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -1,7 +1,9 @@ package waku import ( + "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -10,6 +12,9 @@ import ( "time" "github.com/cenkalti/backoff/v3" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) type NwakuInfo struct { @@ -77,3 +82,136 @@ func RetryWithBackOff(o func() error, options ...BackOffOption) error { b.Reset() return backoff.Retry(o, &b) } + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +} + +func WaitForAutoConnection(nodeList []*WakuNode) error { + Debug("Waiting for auto-connection of nodes...") + + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + err := RetryWithBackOff(func() error { + for _, node := range nodeList { + peers, err := node.GetConnectedPeers() + if err != nil { + return err + } + + if len(peers) < 1 { + return errors.New("expected at least one connected peer") // Retry + } + + Debug("Node %s has %d connected peers", node.nodeName, len(peers)) + } + + return nil + }, options) + + if err != nil { + Error("Auto-connection failed after retries: %v", err) + return err + } + + Debug("Auto-connection check completed successfully") + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash, timeout ...time.Duration) error { + + var verifyTimeout time.Duration + if len(timeout) > 0 { + verifyTimeout = timeout[0] + } else { + verifyTimeout = DefaultTimeOut + } + + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, verifyTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), verifyTimeout) + defer cancel() + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", verifyTimeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") + } +} + +func ConnectAllPeers(nodes []*WakuNode) error { + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + <-ctx.Done() + Debug("Connections stabilized") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := node.RelaySubscribe(topic) + + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +} diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go new file mode 100644 index 0000000..4971dc1 --- /dev/null +++ b/waku/peer_connections_test.go @@ -0,0 +1,227 @@ +package waku + +import ( + "slices" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Test node connect & disconnect peers +func TestDisconnectPeerNodes(t *testing.T) { + Debug("Starting TestDisconnectPeerNodes") + + nodeA, err := StartWakuNode("nodeA", nil) + require.NoError(t, err, "Failed to start Node A") + defer nodeA.StopAndDestroy() + + nodeB, err := StartWakuNode("nodeB", nil) + require.NoError(t, err, "Failed to start Node B") + defer nodeB.StopAndDestroy() + + Debug("Connecting Node A to Node B") + err = nodeA.ConnectPeer(nodeB) + require.NoError(t, err, "Failed to connect nodes") + + Debug("Verifying connection between Node A and Node B") + connectedPeers, err := nodeA.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node A") + nodeBPeerID, err := nodeB.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node B") + require.True(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should be a peer of Node A before disconnection") + + Debug("Disconnecting Node A from Node B") + err = nodeA.DisconnectPeer(nodeB) + require.NoError(t, err, "Failed to disconnect nodes") + + Debug("Verifying disconnection between Node A and Node B") + connectedPeers, err = nodeA.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node A after disconnection") + require.False(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should no longer be a peer of Node A after disconnection") + Debug("Test completed successfully: Node B was disconnected from Node A") +} + +func TestConnectMultipleNodesToSingleNode(t *testing.T) { + Debug("Starting TestConnectMultipleNodesToSingleNode") + + Debug("Creating 3 nodes with automatically assigned ports") + + node1, err := StartWakuNode("node1", nil) + require.NoError(t, err, "Failed to start Node 1") + defer func() { + Debug("Stopping and destroying Node 1") + node1.StopAndDestroy() + }() + + node2, err := StartWakuNode("node2", nil) + require.NoError(t, err, "Failed to start Node 2") + defer func() { + Debug("Stopping and destroying Node 2") + node2.StopAndDestroy() + }() + + node3, err := StartWakuNode("node3", nil) + require.NoError(t, err, "Failed to start Node 3") + defer func() { + Debug("Stopping and destroying Node 3") + node3.StopAndDestroy() + }() + + Debug("Connecting Node 2 to Node 1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 2 to Node 1") + + Debug("Connecting Node 3 to Node 1") + err = node3.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 3 to Node 1") + + Debug("Verifying connected peers for Node 1") + connectedPeers, err := node1.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node 1") + node3PeerID, err := node3.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 1") + node2PeerID, err := node2.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 2") + + require.True(t, slices.Contains(connectedPeers, node3PeerID), "Node 3 should be a peer of Node 1") + require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 1") + + Debug("Test completed successfully: multiple nodes connected to a single node and verified peers") +} + +func TestDiscv5PeerMeshCount(t *testing.T) { + Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node2Config.Relay = true + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node3Config.Relay = true + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe all nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in mesh for Node1 before stopping Node3") + peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") + + Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore) + require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3") + + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network update after Node3 stops") + time.Sleep(10 * time.Second) + + Debug("Fetching number of peers in mesh for Node1 after stopping Node3") + peerCountAfter, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") + + Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter) + require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3") + + Debug("Test successfully verified peer count change after stopping Node3") +} + +// this test commented as it will fail will be changed to have external ip in future task +/* +func TestDiscv5GetPeersConnected(t *testing.T) { + Debug("Starting test to verify peer count in mesh with 4 nodes using Discv5 (Chained Connection)") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + enrNode2, err := node2.ENR() + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + enrNode3, err := node3.ENR() + require.NoError(t, err, "Failed to get ENR for Node3") + + node4Config := DefaultWakuConfig + node4Config.Relay = true + node4Config.Discv5BootstrapNodes = []string{enrNode3.String()} + + Debug("Creating Node4 with Node3 as Discv5 bootstrap") + node4, err := StartWakuNode("Node4", &node4Config) + require.NoError(t, err, "Failed to start Node4") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + node4.StopAndDestroy() + }() + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in connected to Node1") + peerCount, err := node1.GetNumConnectedPeers() + require.NoError(t, err, "Failed to get number of peers in mesh for Node1") + + Debug("Total number of peers connected to Node1: %d", peerCount) + require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh") + + Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)") +} +*/ \ No newline at end of file diff --git a/waku/relay_test.go b/waku/relay_test.go new file mode 100644 index 0000000..0315029 --- /dev/null +++ b/waku/relay_test.go @@ -0,0 +1,509 @@ +package waku + +import ( + "fmt" + "testing" + "time" + + "github.com/cenkalti/backoff/v3" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestVerifyNumConnectedRelayPeers(t *testing.T) { + + node1Cfg := DefaultWakuConfig + node1Cfg.Relay = true + node1, err := StartWakuNode("node1", &node1Cfg) + if err != nil { + t.Fatalf("Failed to start node1: %v", err) + } + node2Cfg := DefaultWakuConfig + node2Cfg.Relay = true + node2, err := StartWakuNode("node2", &node2Cfg) + if err != nil { + t.Fatalf("Failed to start node2: %v", err) + } + + node3, err := StartWakuNode("node3", nil) + if err != nil { + t.Fatalf("Failed to start node3: %v", err) + } + + defer func() { + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + err = node2.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node2 to node1: %v", err) + } + + err = node3.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node3 to node1: %v", err) + } + connectedPeersNode1, err := node1.GetConnectedPeers() + if err != nil { + t.Fatalf("Failed to get connected peers for node1: %v", err) + } + if len(connectedPeersNode1) != 2 { + t.Fatalf("Expected 2 connected peers on node1, but got %d", len(connectedPeersNode1)) + } + + numRelayPeers, err := node1.GetNumConnectedRelayPeers() + if err != nil { + t.Fatalf("Failed to get connected relay peers for node1: %v", err) + } + if numRelayPeers != 1 { + t.Fatalf("Expected 1 relay peer on node1, but got %d", numRelayPeers) + } + + t.Logf("Successfully connected node2 and node3 to node1. Relay Peers: %d, Total Peers: %d", numRelayPeers, len(connectedPeersNode1)) +} + +func TestRelayMessageTransmission(t *testing.T) { + Debug("Starting TestRelayMessageTransmission") + + Debug("Creating Sender Node with Relay enabled") + senderConfig := DefaultWakuConfig + senderConfig.Relay = true + + senderNode, err := StartWakuNode("SenderNode", &senderConfig) + require.NoError(t, err, "Failed to start SenderNode") + defer senderNode.StopAndDestroy() + + Debug("Creating Receiver Node with Relay enabled") + receiverConfig := DefaultWakuConfig + receiverConfig.Relay = true + + // Set the Receiver Node's discovery bootstrap node as SenderNode + enrSender, err := senderNode.ENR() + require.NoError(t, err, "Failed to get ENR for SenderNode") + receiverConfig.Discv5BootstrapNodes = []string{enrSender.String()} + + receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig) + require.NoError(t, err, "Failed to start ReceiverNode") + defer receiverNode.StopAndDestroy() + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Creating and publishing message") + message := senderNode.CreateMessage() + var msgHash string + + err = RetryWithBackOff(func() error { + var err error + msgHashObj, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message) + if err == nil { + msgHash = msgHashObj.String() + } + return err + }) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying message reception") + err = RetryWithBackOff(func() error { + msgHashObj, _ := common.ToMessageHash(msgHash) + return receiverNode.VerifyMessageReceived(message, msgHashObj) + }) + require.NoError(t, err, "Message verification failed") + + Debug("TestRelayMessageTransmission completed successfully") +} + +func TestRelayMessageBroadcast(t *testing.T) { + Debug("Starting TestRelayMessageBroadcast") + + numPeers := 5 + nodes := make([]*WakuNode, numPeers) + nodeNames := []string{"SenderNode", "PeerNode1", "PeerNode2", "PeerNode3", "PeerNode4"} + defaultPubsubTopic := DefaultPubsubTopic + + for i := 0; i < numPeers; i++ { + Debug("Creating node %s", nodeNames[i]) + nodeConfig := DefaultWakuConfig + nodeConfig.Relay = true + if i > 0 { + enrPrevNode, err := nodes[i-1].ENR() + require.NoError(t, err, "Failed to get ENR for node %s", nodeNames[i-1]) + nodeConfig.Discv5BootstrapNodes = []string{enrPrevNode.String()} + } + node, err := StartWakuNode(nodeNames[i], &nodeConfig) + require.NoError(t, err) + defer node.StopAndDestroy() + nodes[i] = node + } + + WaitForAutoConnection(nodes) + + senderNode := nodes[0] + Debug("SenderNode is publishing a message") + message := senderNode.CreateMessage() + msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Waiting to ensure message delivery") + time.Sleep(3 * time.Second) + + Debug("Verifying message reception for each node") + for i, node := range nodes { + Debug("Verifying message for node %s", nodeNames[i]) + err := node.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "message verification failed for node: %s", nodeNames[i]) + } + + Debug("TestRelayMessageBroadcast completed successfully") +} + +func TestSendmsgInvalidPayload(t *testing.T) { + Debug("Starting TestInvalidMessageFormat") + + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Creating nodes") + senderNodeConfig := DefaultWakuConfig + senderNodeConfig.Relay = true + senderNode, err := StartWakuNode("SenderNode", &senderNodeConfig) + require.NoError(t, err) + defer senderNode.StopAndDestroy() + + receiverNodeConfig := DefaultWakuConfig + receiverNodeConfig.Relay = true + enrNode2, err := senderNode.ENR() + if err != nil { + require.Error(t, err, "Can't find node ENR") + } + receiverNodeConfig.Discv5BootstrapNodes = []string{enrNode2.String()} + receiverNode, err := StartWakuNode("receiverNode", &receiverNodeConfig) + require.NoError(t, err) + defer receiverNode.StopAndDestroy() + + err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("SenderNode is publishing an invalid message") + invalidMessage := &pb.WakuMessage{ + Payload: []byte{}, + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + message := senderNode.CreateMessage(invalidMessage) + var msgHash common.MessageHash + msgHash, err = senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + + Debug("Verifying if message was sent or failed") + if err != nil { + Debug("Message was not sent due to invalid format: %v", err) + + } else { + Debug("Message was unexpectedly sent: %s", msgHash.String()) + require.Fail(t, "message with invalid format should not be sent") + } + + Debug("TestInvalidMessageFormat completed") +} + +func TestRelayNodesNotConnectedDirectly(t *testing.T) { + Debug("Starting TestRelayNodesNotConnectedDirectly") + + Debug("Creating Sender Node with Relay enabled") + senderConfig := DefaultWakuConfig + senderConfig.Relay = true + senderNode, err := StartWakuNode("SenderNode", &senderConfig) + require.NoError(t, err) + defer senderNode.StopAndDestroy() + + Debug("Creating Relay-Enabled Receiver Node (Node2)") + node2Config := DefaultWakuConfig + node2Config.Relay = true + + // Use static nodes instead of ENR + node1Address, err := senderNode.ListenAddresses() + require.NoError(t, err, "Failed to get sender node address") + node2Config.Staticnodes = []string{node1Address[0].String()} + + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err) + defer node2.StopAndDestroy() + + Debug("Creating Relay-Enabled Receiver Node (Node3)") + node3Config := DefaultWakuConfig + node3Config.Relay = true + + // Use static nodes instead of Discv5 + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get node2 address") + node3Config.Staticnodes = []string{node2Address[0].String()} + + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err) + defer node3.StopAndDestroy() + + Debug("Waiting for nodes to connect before proceeding") + err = WaitForAutoConnection([]*WakuNode{senderNode, node2, node3}) + require.NoError(t, err, "Nodes did not connect within timeout") + + Debug("SenderNode is publishing a message") + message := senderNode.CreateMessage() + msgHash, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Verifying that Node3 received the message") + err = node3.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node3 should have received the message") + + Debug("TestRelayNodesNotConnectedDirectly completed successfully") +} + +func TestRelaySubscribeAndPeerCountChange(t *testing.T) { + Debug("Starting test to verify relay subscription and peer count change after stopping a node") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node1Address, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Staticnodes = []string{node1Address[0].String()} + + Debug("Creating Node2 with Node1 as a static node") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Staticnodes = []string{node2Address[0].String()} + + Debug("Creating Node3 with Node2 as a static node") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + Debug("Waiting for nodes to connect via static node configuration") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not connect within timeout") + + Debug("Waiting for peer connections to stabilize") + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 10 * time.Second // Only set the max wait time + } + require.NoError(t, RetryWithBackOff(func() error { + numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + if err != nil { + return err + } + if numPeers != 2 { + return fmt.Errorf("expected 2 relay peers, got %d", numPeers) + } + return nil + }, options), "Peers did not stabilize in time") + + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network to update after Node3 stops") + require.NoError(t, RetryWithBackOff(func() error { + numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + if err != nil { + return err + } + if numPeers != 1 { + return fmt.Errorf("expected 1 relay peer after stopping Node3, got %d", numPeers) + } + return nil + }, options), "Peer count did not update after stopping Node3") + + Debug("Test successfully verified peer count changes as expected after stopping Node3") +} + +func TestRelaySubscribeFailsWhenRelayDisabled(t *testing.T) { + Debug("Starting test to verify that subscribing to a topic fails when Relay is disabled") + + nodeConfig := DefaultWakuConfig + nodeConfig.Relay = false + + Debug("Creating Node with Relay disabled") + node, err := StartWakuNode("TestNode", &nodeConfig) + require.NoError(t, err, "Failed to start Node") + + defer func() { + Debug("Stopping and destroying the Waku node") + node.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Attempting to subscribe to the default pubsub topic: %s", defaultPubsubTopic) + + err = node.RelaySubscribe(defaultPubsubTopic) + + Debug("Verifying that subscription failed") + require.Error(t, err, "Expected RelaySubscribe to return an error when Relay is disabled") + + Debug("Test successfully verified that RelaySubscribe fails when Relay is disabled") +} + +func TestRelayDisabledNodeDoesNotReceiveMessages(t *testing.T) { + Debug("Starting test to verify that a node with Relay disabled does not receive messages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + enrNode2, err := node2.ENR() + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Creating and publishing message from Node1") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting to ensure message delivery") + time.Sleep(3 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Verifying that Node3 did NOT receive the message") + err = node3.VerifyMessageReceived(message, msgHash) + require.Error(t, err, "Node3 should NOT have received the message") + + Debug("Test successfully verified that Node3 did not receive the message") +} + +func TestPublishWithLargePayload(t *testing.T) { + Debug("Starting test to verify message publishing with a payload close to 150KB") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + payloadLength := 1024 * 100 // 100KB raw, approximately 150KB when base64 encoded + Debug("Generating a large payload of %d bytes", payloadLength) + + largePayload := make([]byte, payloadLength) + for i := range largePayload { + largePayload[i] = 'a' + } + + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: largePayload, + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + Debug("Publishing message from Node1 with large payload") + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting to ensure message propagation") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Test successfully verified message publishing with a large payload") +} diff --git a/waku/test_data.go b/waku/test_data.go index 121e1f2..67125d1 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -10,14 +10,7 @@ var DefaultWakuConfig common.WakuConfig func init() { - udpPort, _, err1 := GetFreePortIfNeeded(0, 0) - tcpPort, _, err2 := GetFreePortIfNeeded(0, 0) - - if err1 != nil || err2 != nil { - Error("Failed to get free ports %v %v", err1, err2) - } - - DefaultWakuConfig = common.WakuConfig{ + DefaultWakuConfig = WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: true, @@ -27,12 +20,13 @@ func init() { Store: false, Filter: false, Lightpush: false, - Discv5UdpPort: udpPort, - TcpPort: tcpPort, + Discv5UdpPort: 0, + TcpPort: 0, } } const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node +const DefaultTimeOut = 3 * time.Second var DefaultPubsubTopic = "/waku/2/rs/16/64" var (