diff --git a/testlibs/src/Relay_wrapper.go b/testlibs/src/Relay_wrapper.go new file mode 100644 index 0000000..b43b4dd --- /dev/null +++ b/testlibs/src/Relay_wrapper.go @@ -0,0 +1,64 @@ +package testlibs + +import ( + "errors" + + utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities" + "go.uber.org/zap" +) + +func (wrapper *WakuNodeWrapper) Wrappers_RelaySubscribe(pubsubTopic string) error { + utilities.Debug("Attempting to subscribe to relay topic", zap.String("topic", pubsubTopic)) + + if wrapper.WakuNode == nil { + err := errors.New("WakuNode is nil in WakuNodeWrapper") + utilities.Error("Cannot subscribe; node is nil", zap.Error(err)) + return err + } + + err := wrapper.WakuNode.RelaySubscribe(pubsubTopic) + if err != nil { + utilities.Error("Failed to subscribe to relay topic", zap.Error(err)) + return err + } + + // Ensure the subscription happened by checking the number of connected relay peers + numRelayPeers, err := wrapper.Wrappers_GetNumConnectedRelayPeers(pubsubTopic) + if err != nil || numRelayPeers == 0 { + utilities.Error("Subscription verification failed: no connected relay peers found", zap.Error(err)) + return errors.New("subscription verification failed: no connected relay peers") + } + + utilities.Debug("Successfully subscribed to relay topic", zap.String("topic", pubsubTopic)) + return nil +} + +func (wrapper *WakuNodeWrapper) Wrappers_RelayUnsubscribe(pubsubTopic string) error { + utilities.Debug("Attempting to unsubscribe from relay topic", zap.String("topic", pubsubTopic)) + + if wrapper.WakuNode == nil { + err := errors.New("WakuNode is nil in WakuNodeWrapper") + utilities.Error("Cannot unsubscribe; node is nil", zap.Error(err)) + return err + } + + err := wrapper.WakuNode.RelayUnsubscribe(pubsubTopic) + if err != nil { + utilities.Error("Failed to unsubscribe from relay topic", zap.Error(err)) + return err + } + + // Ensure the unsubscription happened by verifying the relay peers count + numRelayPeers, err := wrapper.Wrappers_GetNumConnectedRelayPeers(pubsubTopic) + if err != nil { + utilities.Error("Failed to verify unsubscription from relay topic", zap.Error(err)) + return err + } + if numRelayPeers > 0 { + utilities.Error("Unsubscription verification failed: relay peers still connected", zap.Int("relayPeers", numRelayPeers)) + return errors.New("unsubscription verification failed: relay peers still connected") + } + + utilities.Debug("Successfully unsubscribed from relay topic", zap.String("topic", pubsubTopic)) + return nil +} diff --git a/testlibs/src/testing-wrappers.go b/testlibs/src/testing-wrappers.go index 8fe38f7..a27a436 100644 --- a/testlibs/src/testing-wrappers.go +++ b/testlibs/src/testing-wrappers.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/libp2p/go-libp2p/core/peer" + utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities" "github.com/waku-org/waku-go-bindings/waku" "go.uber.org/zap" @@ -115,6 +117,33 @@ func (wrapper *WakuNodeWrapper) Wrappers_StopAndDestroy() error { return nil } +func (wrapper *WakuNodeWrapper) Wrappers_GetConnectedPeers() ([]peer.ID, error) { + if wrapper.WakuNode == nil { + err := errors.New("WakuNode is nil in WakuNodeWrapper") + utilities.Error("Cannot proceed; node is nil", zap.Error(err)) + return nil, err + } + + peerID, err := wrapper.WakuNode.PeerID() + if err != nil { + utilities.Error("Failed to get PeerID of node", zap.Error(err)) + return nil, err + } + + utilities.Debug("Getting number of connected peers to node", zap.String("node", peerID.String())) + + peers, err := wrapper.WakuNode.GetConnectedPeers() + if err != nil { + utilities.Error("Failed to get connected peers", zap.Error(err)) + return nil, err + } + + utilities.Debug("Successfully fetched connected peers", + zap.Int("count", len(peers)), + ) + return peers, nil +} + func (wrapper *WakuNodeWrapper) Wrappers_GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { utilities.Debug("Wrappers_GetNumConnectedRelayPeers called") @@ -137,62 +166,122 @@ func (wrapper *WakuNodeWrapper) Wrappers_GetNumConnectedRelayPeers(optPubsubTopi return numPeers, nil } -func (w *WakuNodeWrapper) Wrappers_ConnectPeer(target *WakuNodeWrapper) error { - if w.WakuNode == nil { +func (wrapper *WakuNodeWrapper) Wrappers_ConnectPeer(targetNode *WakuNodeWrapper) error { + + utilities.Debug("Connect node to peer") + if wrapper.WakuNode == nil { err := errors.New("WakuNode is nil in caller") utilities.Error("Cannot call Connect; caller node is nil", zap.Error(err)) return err } - if target == nil || target.WakuNode == nil { - err := errors.New("target WakuNode is nil") + if targetNode == nil || targetNode.WakuNode == nil { + err := errors.New("WakuNode is nil in target") utilities.Error("Cannot connect; target node is nil", zap.Error(err)) return err } - addrs, err := target.ListenAddresses() - if err != nil || len(addrs) == 0 { - errMsg := "failed to obtain target node's listening addresses" - utilities.Error(errMsg, zap.String("error", err.Error())) - return errors.New(errMsg) + targetPeerID, err := targetNode.WakuNode.PeerID() + if err != nil { + utilities.Error("Failed to get PeerID of target node", zap.Error(err)) + return err } - peerAddr := addrs[0] + utilities.Debug("Get connected peers before attempting to connect") - utilities.Debug("Wrappers_ConnectPeer called", zap.String("targetAddr", peerAddr.String())) + connectedPeersBefore, err := wrapper.Wrappers_GetConnectedPeers() + if err != nil { + utilities.Debug("Could not fetch connected peers before connecting (might be none yet)", zap.Error(err)) + } else { + utilities.Debug("Connected peers before connecting", zap.Int("count", len(connectedPeersBefore))) + } + utilities.Debug("Attempt to connect to the target node") ctx, cancel := context.WithTimeout(context.Background(), utilities.ConnectPeerTimeout) defer cancel() - utilities.Debug("Connecting to peer with address", zap.String("address", peerAddr.String())) - err = w.WakuNode.Connect(ctx, peerAddr) + targetAddr, err := targetNode.WakuNode.ListenAddresses() + if err != nil || len(targetAddr) == 0 { + utilities.Error("Failed to get listen addresses for target node", zap.Error(err)) + return errors.New("target node has no listen addresses") + } + + utilities.Debug("Connecting to peer", zap.String("address", targetAddr[0].String())) + err = wrapper.WakuNode.Connect(ctx, targetAddr[0]) if err != nil { - utilities.Error("Failed to connect", zap.Error(err)) + utilities.Error("Failed to connect to peer", zap.Error(err)) return err } - utilities.Debug("Successfully connected", zap.String("address", peerAddr.String())) + utilities.Debug("Get connected peers after attempting to connect") + connectedPeersAfter, err := wrapper.Wrappers_GetConnectedPeers() + if err != nil { + utilities.Error("Failed to get connected peers after connecting", zap.Error(err)) + return err + } + + utilities.Debug("Connected peers after connecting", zap.Int("count", len(connectedPeersAfter))) + + utilities.Debug("Check if the target peer is now connected") + isConnected := false + for _, peerID := range connectedPeersAfter { + if peerID == targetPeerID { + isConnected = true + break + } + } + + if !isConnected { + err := errors.New("failed to connect; target peer is not in connected peers list") + utilities.Error("Connect operation failed", zap.Error(err)) + return err + } + + utilities.Debug("Successfully connected to target peer", zap.String("targetPeerID", targetPeerID.String())) return nil } -func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(targetNode *WakuNodeWrapper) error { +func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(target *WakuNodeWrapper) error { + if wrapper.WakuNode == nil { - err := errors.New("the calling WakuNode is nil") - utilities.Error("Cannot disconnect; calling node is nil", zap.Error(err)) + err := errors.New("WakuNode is nil in caller") + utilities.Error("Cannot call Disconnect; caller node is nil", zap.Error(err)) return err } - if targetNode == nil || targetNode.WakuNode == nil { - err := errors.New("the target WakuNode is nil") + if target == nil || target.WakuNode == nil { + err := errors.New("target WakuNode is nil") utilities.Error("Cannot disconnect; target node is nil", zap.Error(err)) return err } - peerID, err := targetNode.WakuNode.PeerID() + utilities.Debug("Check if nodes are peers first") + + peerID, err := target.WakuNode.PeerID() if err != nil { - utilities.Error("Failed to retrieve peer ID from target node", zap.Error(err)) + utilities.Error("Failed to get PeerID of target node", zap.Error(err)) return err } - utilities.Debug("Wrappers_DisconnectPeer", zap.String("peerID", peerID.String())) + connectedPeers, err := wrapper.Wrappers_GetConnectedPeers() + if err != nil { + utilities.Error("Failed to get connected peers", zap.Error(err)) + return err + } + + isPeer := false + for _, connectedPeerID := range connectedPeers { + if connectedPeerID == peerID { + isPeer = true + break + } + } + + if !isPeer { + err = errors.New("nodes are not connected as peers") + utilities.Error("Cannot disconnect; nodes are not peers", zap.Error(err)) + return err + } + + utilities.Debug("Nodes are peers.. attempting to disconnect") err = wrapper.WakuNode.DisconnectPeerByID(peerID) if err != nil { utilities.Error("Failed to disconnect peer", zap.Error(err)) @@ -202,3 +291,5 @@ func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(targetNode *WakuNodeWrap utilities.Debug("Successfully disconnected peer", zap.String("peerID", peerID.String())) return nil } + + diff --git a/testlibs/utilities/test_data.go b/testlibs/utilities/test_data.go new file mode 100644 index 0000000..f79fe65 --- /dev/null +++ b/testlibs/utilities/test_data.go @@ -0,0 +1,9 @@ +package utilities + +import ( + "time" +) + +var ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node + +var DefaultPubsubTopic = "/waku/2/rs/3/0" diff --git a/testlibs/utilities/utils.go b/testlibs/utilities/utils.go index d6f2d36..59298e4 100644 --- a/testlibs/utilities/utils.go +++ b/testlibs/utilities/utils.go @@ -32,7 +32,6 @@ var DefaultWakuConfig = &waku.WakuConfig{ type WakuConfigOption func(*waku.WakuConfig) -var ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node func GenerateUniquePort() int { rng := rand.New(rand.NewSource(time.Now().UnixNano())) // Local RNG instance diff --git a/testlibs/waku-go-tests/Nodes_basic_test.go b/testlibs/waku-go-tests/Nodes_basic_test.go index e3ddb39..1f9ee54 100644 --- a/testlibs/waku-go-tests/Nodes_basic_test.go +++ b/testlibs/waku-go-tests/Nodes_basic_test.go @@ -46,29 +46,3 @@ func TestBasicWakuNodes(t *testing.T) { require.NoError(t, err, "Failed to stop+destroy Node 2") } -// Test to connect 2 nodes and disconnect them - -func TestConnectAndDisconnectNodes(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err, "failed to create logger") - - nodeA, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeA")) - require.NoError(t, err, "failed to create/start Node A") - defer nodeA.Wrappers_StopAndDestroy() // ensures cleanup - - nodeB, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB")) - require.NoError(t, err, "failed to create/start Node B") - defer nodeB.Wrappers_StopAndDestroy() // ensures cleanup - - nodeC, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB")) - require.NoError(t, err, "failed to create/start Node B") - defer nodeC.Wrappers_StopAndDestroy() // ensures cleanup - - err = nodeA.Wrappers_ConnectPeer(nodeB) - require.NoError(t, err, "failed to connect Node A to Node B") - - time.Sleep(3 * time.Second) - - err = nodeA.Wrappers_DisconnectPeer(nodeC) - require.NoError(t, err, "failed to disconnect Node A from Node B") -} diff --git a/testlibs/waku-go-tests/Peers_connection_test.go b/testlibs/waku-go-tests/Peers_connection_test.go new file mode 100644 index 0000000..e3fa985 --- /dev/null +++ b/testlibs/waku-go-tests/Peers_connection_test.go @@ -0,0 +1,76 @@ +package waku_go_tests + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + testlibs "github.com/waku-org/waku-go-bindings/testlibs/src" + utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities" + "go.uber.org/zap" +) + +// test node connect & disconnect peers +func TestDisconnectPeerNodes(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + // Create Node A + nodeA, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeA")) + require.NoError(t, err) + defer nodeA.Wrappers_StopAndDestroy() + + // Create Node B + nodeB, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB")) + require.NoError(t, err) + defer nodeB.Wrappers_StopAndDestroy() + + // Connect Node A to Node B + err = nodeA.Wrappers_ConnectPeer(nodeB) + require.NoError(t, err, "failed to connect nodes") + + // Wait for 3 seconds + time.Sleep(3 * time.Second) + + // Disconnect Node A from Node B + err = nodeA.Wrappers_DisconnectPeer(nodeB) + require.NoError(t, err, "failed to disconnect nodes") +} + +func TestConnectMultipleNodesToSingleNode(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + utilities.Debug("Starting test to connect multiple nodes to a single node") + utilities.Debug("Create 3 nodes") + node1, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node1")) + require.NoError(t, err) + defer func() { + utilities.Debug("Stopping and destroying Node 1") + node1.Wrappers_StopAndDestroy() + }() + + node2, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node2")) + require.NoError(t, err) + defer func() { + utilities.Debug("Stopping and destroying Node 2") + node2.Wrappers_StopAndDestroy() + }() + + node3, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node3")) + require.NoError(t, err) + defer func() { + utilities.Debug("Stopping and destroying Node 3") + node3.Wrappers_StopAndDestroy() + }() + + utilities.Debug("Connecting Node 2 to Node 1") + err = node2.Wrappers_ConnectPeer(node1) + require.NoError(t, err) + + utilities.Debug("Connecting Node 3 to Node 1") + err = node3.Wrappers_ConnectPeer(node1) + require.NoError(t, err) + + utilities.Debug("Test completed successfully: multiple nodes connected to a single node") +} diff --git a/testlibs/waku-go-tests/relay_test.go b/testlibs/waku-go-tests/relay_test.go new file mode 100644 index 0000000..52828b7 --- /dev/null +++ b/testlibs/waku-go-tests/relay_test.go @@ -0,0 +1,50 @@ +package waku_go_tests + +import ( + "testing" + + "github.com/stretchr/testify/require" + testlibs "github.com/waku-org/waku-go-bindings/testlibs/src" + utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities" + "go.uber.org/zap" +) + +func TestRelaySubscribeToDefaultTopic(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + utilities.Debug("Starting test to verify relay subscription to the default pubsub topic") + + // Define the configuration with relay = true + wakuConfig := *utilities.DefaultWakuConfig + wakuConfig.Relay = true + + utilities.Debug("Creating a Waku node with relay enabled") + node, err := testlibs.Wrappers_StartWakuNode(&wakuConfig, logger.Named("TestNode")) + require.NoError(t, err) + defer func() { + utilities.Debug("Stopping and destroying the Waku node") + node.Wrappers_StopAndDestroy() + }() + + defaultPubsubTopic := utilities.DefaultPubsubTopic + utilities.Debug("Default pubsub topic retrieved", zap.String("topic", defaultPubsubTopic)) + + utilities.Debug("Fetching number of connected relay peers before subscription", zap.String("topic", defaultPubsubTopic)) + numPeersBefore, err := node.Wrappers_GetNumConnectedRelayPeers(defaultPubsubTopic) + require.NoError(t, err) + utilities.Debug("Number of connected relay peers before subscription", zap.Int("count", numPeersBefore)) + + utilities.Debug("Attempting to subscribe to the default pubsub topic", zap.String("topic", defaultPubsubTopic)) + err = node.Wrappers_RelaySubscribe(defaultPubsubTopic) + require.NoError(t, err) + + utilities.Debug("Fetching number of connected relay peers after subscription", zap.String("topic", defaultPubsubTopic)) + numPeersAfter, err := node.Wrappers_GetNumConnectedRelayPeers(defaultPubsubTopic) + require.NoError(t, err) + utilities.Debug("Number of connected relay peers after subscription", zap.Int("count", numPeersAfter)) + + require.Greater(t, numPeersAfter, numPeersBefore, "Number of connected relay peers should increase after subscription") + + utilities.Debug("Test successfully verified subscription to the default pubsub topic", zap.String("topic", defaultPubsubTopic)) +} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 09db846..5e9ab59 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -566,10 +566,23 @@ func TestConnectionChange(t *testing.T) { TcpPort: 60060, } + wakuConfig3 := WakuConfig{ + Relay: false, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9060, + TcpPort: 60060, + } + node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) require.NoError(t, err) require.NoError(t, node1.Start()) + node3, err := NewWakuNode(&wakuConfig3, logger.Named("node3")) + require.NoError(t, err) + require.NoError(t, node3.Start()) // start node2 wakuConfig2 := WakuConfig{ Relay: true, @@ -602,32 +615,16 @@ func TestConnectionChange(t *testing.T) { require.NoError(t, err) require.True(t, peerCount2 == 1, "node2 should have 1 peer") - peerId1, err := node1.PeerID() + //peerId1, err := node1.PeerID() require.NoError(t, err) - - // Wait to receive connectionChange event - select { - case connectionChange := <-node2.ConnectionChangeChan: - require.NotNil(t, connectionChange, "connectionChange should be updated") - require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted") - require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") - case <-time.After(10 * time.Second): - t.Fatal("Timeout: No connectionChange event received within 10 seconds") - } + peerId3, _ := node3.PeerID() // Disconnect from node1 - err = node2.DisconnectPeerByID(peerId1) + err = node1.DisconnectPeerByID(peerId3) + require.NoError(t, err) // Wait to receive connectionChange event - select { - case connectionChange := <-node2.ConnectionChangeChan: - require.NotNil(t, connectionChange, "connectionChange should be updated") - require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted") - require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") - case <-time.After(10 * time.Second): - t.Fatal("Timeout: No connectionChange event received within 10 seconds") - } // Stop nodes require.NoError(t, node1.Stop())