Adding peers connection tests & relay tests

This commit is contained in:
aya 2025-01-23 18:54:50 +02:00
parent 18840fa68c
commit 8a3f6cfb5c
8 changed files with 330 additions and 70 deletions

View File

@ -0,0 +1,64 @@
package testlibs
import (
"errors"
utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities"
"go.uber.org/zap"
)
func (wrapper *WakuNodeWrapper) Wrappers_RelaySubscribe(pubsubTopic string) error {
utilities.Debug("Attempting to subscribe to relay topic", zap.String("topic", pubsubTopic))
if wrapper.WakuNode == nil {
err := errors.New("WakuNode is nil in WakuNodeWrapper")
utilities.Error("Cannot subscribe; node is nil", zap.Error(err))
return err
}
err := wrapper.WakuNode.RelaySubscribe(pubsubTopic)
if err != nil {
utilities.Error("Failed to subscribe to relay topic", zap.Error(err))
return err
}
// Ensure the subscription happened by checking the number of connected relay peers
numRelayPeers, err := wrapper.Wrappers_GetNumConnectedRelayPeers(pubsubTopic)
if err != nil || numRelayPeers == 0 {
utilities.Error("Subscription verification failed: no connected relay peers found", zap.Error(err))
return errors.New("subscription verification failed: no connected relay peers")
}
utilities.Debug("Successfully subscribed to relay topic", zap.String("topic", pubsubTopic))
return nil
}
func (wrapper *WakuNodeWrapper) Wrappers_RelayUnsubscribe(pubsubTopic string) error {
utilities.Debug("Attempting to unsubscribe from relay topic", zap.String("topic", pubsubTopic))
if wrapper.WakuNode == nil {
err := errors.New("WakuNode is nil in WakuNodeWrapper")
utilities.Error("Cannot unsubscribe; node is nil", zap.Error(err))
return err
}
err := wrapper.WakuNode.RelayUnsubscribe(pubsubTopic)
if err != nil {
utilities.Error("Failed to unsubscribe from relay topic", zap.Error(err))
return err
}
// Ensure the unsubscription happened by verifying the relay peers count
numRelayPeers, err := wrapper.Wrappers_GetNumConnectedRelayPeers(pubsubTopic)
if err != nil {
utilities.Error("Failed to verify unsubscription from relay topic", zap.Error(err))
return err
}
if numRelayPeers > 0 {
utilities.Error("Unsubscription verification failed: relay peers still connected", zap.Int("relayPeers", numRelayPeers))
return errors.New("unsubscription verification failed: relay peers still connected")
}
utilities.Debug("Successfully unsubscribed from relay topic", zap.String("topic", pubsubTopic))
return nil
}

View File

@ -4,6 +4,8 @@ import (
"context"
"errors"
"github.com/libp2p/go-libp2p/core/peer"
utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities"
"github.com/waku-org/waku-go-bindings/waku"
"go.uber.org/zap"
@ -115,6 +117,33 @@ func (wrapper *WakuNodeWrapper) Wrappers_StopAndDestroy() error {
return nil
}
func (wrapper *WakuNodeWrapper) Wrappers_GetConnectedPeers() ([]peer.ID, error) {
if wrapper.WakuNode == nil {
err := errors.New("WakuNode is nil in WakuNodeWrapper")
utilities.Error("Cannot proceed; node is nil", zap.Error(err))
return nil, err
}
peerID, err := wrapper.WakuNode.PeerID()
if err != nil {
utilities.Error("Failed to get PeerID of node", zap.Error(err))
return nil, err
}
utilities.Debug("Getting number of connected peers to node", zap.String("node", peerID.String()))
peers, err := wrapper.WakuNode.GetConnectedPeers()
if err != nil {
utilities.Error("Failed to get connected peers", zap.Error(err))
return nil, err
}
utilities.Debug("Successfully fetched connected peers",
zap.Int("count", len(peers)),
)
return peers, nil
}
func (wrapper *WakuNodeWrapper) Wrappers_GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
utilities.Debug("Wrappers_GetNumConnectedRelayPeers called")
@ -137,62 +166,122 @@ func (wrapper *WakuNodeWrapper) Wrappers_GetNumConnectedRelayPeers(optPubsubTopi
return numPeers, nil
}
func (w *WakuNodeWrapper) Wrappers_ConnectPeer(target *WakuNodeWrapper) error {
if w.WakuNode == nil {
func (wrapper *WakuNodeWrapper) Wrappers_ConnectPeer(targetNode *WakuNodeWrapper) error {
utilities.Debug("Connect node to peer")
if wrapper.WakuNode == nil {
err := errors.New("WakuNode is nil in caller")
utilities.Error("Cannot call Connect; caller node is nil", zap.Error(err))
return err
}
if target == nil || target.WakuNode == nil {
err := errors.New("target WakuNode is nil")
if targetNode == nil || targetNode.WakuNode == nil {
err := errors.New("WakuNode is nil in target")
utilities.Error("Cannot connect; target node is nil", zap.Error(err))
return err
}
addrs, err := target.ListenAddresses()
if err != nil || len(addrs) == 0 {
errMsg := "failed to obtain target node's listening addresses"
utilities.Error(errMsg, zap.String("error", err.Error()))
return errors.New(errMsg)
targetPeerID, err := targetNode.WakuNode.PeerID()
if err != nil {
utilities.Error("Failed to get PeerID of target node", zap.Error(err))
return err
}
peerAddr := addrs[0]
utilities.Debug("Get connected peers before attempting to connect")
utilities.Debug("Wrappers_ConnectPeer called", zap.String("targetAddr", peerAddr.String()))
connectedPeersBefore, err := wrapper.Wrappers_GetConnectedPeers()
if err != nil {
utilities.Debug("Could not fetch connected peers before connecting (might be none yet)", zap.Error(err))
} else {
utilities.Debug("Connected peers before connecting", zap.Int("count", len(connectedPeersBefore)))
}
utilities.Debug("Attempt to connect to the target node")
ctx, cancel := context.WithTimeout(context.Background(), utilities.ConnectPeerTimeout)
defer cancel()
utilities.Debug("Connecting to peer with address", zap.String("address", peerAddr.String()))
err = w.WakuNode.Connect(ctx, peerAddr)
targetAddr, err := targetNode.WakuNode.ListenAddresses()
if err != nil || len(targetAddr) == 0 {
utilities.Error("Failed to get listen addresses for target node", zap.Error(err))
return errors.New("target node has no listen addresses")
}
utilities.Debug("Connecting to peer", zap.String("address", targetAddr[0].String()))
err = wrapper.WakuNode.Connect(ctx, targetAddr[0])
if err != nil {
utilities.Error("Failed to connect", zap.Error(err))
utilities.Error("Failed to connect to peer", zap.Error(err))
return err
}
utilities.Debug("Successfully connected", zap.String("address", peerAddr.String()))
utilities.Debug("Get connected peers after attempting to connect")
connectedPeersAfter, err := wrapper.Wrappers_GetConnectedPeers()
if err != nil {
utilities.Error("Failed to get connected peers after connecting", zap.Error(err))
return err
}
utilities.Debug("Connected peers after connecting", zap.Int("count", len(connectedPeersAfter)))
utilities.Debug("Check if the target peer is now connected")
isConnected := false
for _, peerID := range connectedPeersAfter {
if peerID == targetPeerID {
isConnected = true
break
}
}
if !isConnected {
err := errors.New("failed to connect; target peer is not in connected peers list")
utilities.Error("Connect operation failed", zap.Error(err))
return err
}
utilities.Debug("Successfully connected to target peer", zap.String("targetPeerID", targetPeerID.String()))
return nil
}
func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(targetNode *WakuNodeWrapper) error {
func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(target *WakuNodeWrapper) error {
if wrapper.WakuNode == nil {
err := errors.New("the calling WakuNode is nil")
utilities.Error("Cannot disconnect; calling node is nil", zap.Error(err))
err := errors.New("WakuNode is nil in caller")
utilities.Error("Cannot call Disconnect; caller node is nil", zap.Error(err))
return err
}
if targetNode == nil || targetNode.WakuNode == nil {
err := errors.New("the target WakuNode is nil")
if target == nil || target.WakuNode == nil {
err := errors.New("target WakuNode is nil")
utilities.Error("Cannot disconnect; target node is nil", zap.Error(err))
return err
}
peerID, err := targetNode.WakuNode.PeerID()
utilities.Debug("Check if nodes are peers first")
peerID, err := target.WakuNode.PeerID()
if err != nil {
utilities.Error("Failed to retrieve peer ID from target node", zap.Error(err))
utilities.Error("Failed to get PeerID of target node", zap.Error(err))
return err
}
utilities.Debug("Wrappers_DisconnectPeer", zap.String("peerID", peerID.String()))
connectedPeers, err := wrapper.Wrappers_GetConnectedPeers()
if err != nil {
utilities.Error("Failed to get connected peers", zap.Error(err))
return err
}
isPeer := false
for _, connectedPeerID := range connectedPeers {
if connectedPeerID == peerID {
isPeer = true
break
}
}
if !isPeer {
err = errors.New("nodes are not connected as peers")
utilities.Error("Cannot disconnect; nodes are not peers", zap.Error(err))
return err
}
utilities.Debug("Nodes are peers.. attempting to disconnect")
err = wrapper.WakuNode.DisconnectPeerByID(peerID)
if err != nil {
utilities.Error("Failed to disconnect peer", zap.Error(err))
@ -202,3 +291,5 @@ func (wrapper *WakuNodeWrapper) Wrappers_DisconnectPeer(targetNode *WakuNodeWrap
utilities.Debug("Successfully disconnected peer", zap.String("peerID", peerID.String()))
return nil
}

View File

@ -0,0 +1,9 @@
package utilities
import (
"time"
)
var ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
var DefaultPubsubTopic = "/waku/2/rs/3/0"

View File

@ -32,7 +32,6 @@ var DefaultWakuConfig = &waku.WakuConfig{
type WakuConfigOption func(*waku.WakuConfig)
var ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
func GenerateUniquePort() int {
rng := rand.New(rand.NewSource(time.Now().UnixNano())) // Local RNG instance

View File

@ -46,29 +46,3 @@ func TestBasicWakuNodes(t *testing.T) {
require.NoError(t, err, "Failed to stop+destroy Node 2")
}
// Test to connect 2 nodes and disconnect them
func TestConnectAndDisconnectNodes(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err, "failed to create logger")
nodeA, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeA"))
require.NoError(t, err, "failed to create/start Node A")
defer nodeA.Wrappers_StopAndDestroy() // ensures cleanup
nodeB, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB"))
require.NoError(t, err, "failed to create/start Node B")
defer nodeB.Wrappers_StopAndDestroy() // ensures cleanup
nodeC, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB"))
require.NoError(t, err, "failed to create/start Node B")
defer nodeC.Wrappers_StopAndDestroy() // ensures cleanup
err = nodeA.Wrappers_ConnectPeer(nodeB)
require.NoError(t, err, "failed to connect Node A to Node B")
time.Sleep(3 * time.Second)
err = nodeA.Wrappers_DisconnectPeer(nodeC)
require.NoError(t, err, "failed to disconnect Node A from Node B")
}

View File

@ -0,0 +1,76 @@
package waku_go_tests
import (
"testing"
"time"
"github.com/stretchr/testify/require"
testlibs "github.com/waku-org/waku-go-bindings/testlibs/src"
utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities"
"go.uber.org/zap"
)
// test node connect & disconnect peers
func TestDisconnectPeerNodes(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
// Create Node A
nodeA, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeA"))
require.NoError(t, err)
defer nodeA.Wrappers_StopAndDestroy()
// Create Node B
nodeB, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("nodeB"))
require.NoError(t, err)
defer nodeB.Wrappers_StopAndDestroy()
// Connect Node A to Node B
err = nodeA.Wrappers_ConnectPeer(nodeB)
require.NoError(t, err, "failed to connect nodes")
// Wait for 3 seconds
time.Sleep(3 * time.Second)
// Disconnect Node A from Node B
err = nodeA.Wrappers_DisconnectPeer(nodeB)
require.NoError(t, err, "failed to disconnect nodes")
}
func TestConnectMultipleNodesToSingleNode(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
utilities.Debug("Starting test to connect multiple nodes to a single node")
utilities.Debug("Create 3 nodes")
node1, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node1"))
require.NoError(t, err)
defer func() {
utilities.Debug("Stopping and destroying Node 1")
node1.Wrappers_StopAndDestroy()
}()
node2, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node2"))
require.NoError(t, err)
defer func() {
utilities.Debug("Stopping and destroying Node 2")
node2.Wrappers_StopAndDestroy()
}()
node3, err := testlibs.Wrappers_StartWakuNode(nil, logger.Named("Node3"))
require.NoError(t, err)
defer func() {
utilities.Debug("Stopping and destroying Node 3")
node3.Wrappers_StopAndDestroy()
}()
utilities.Debug("Connecting Node 2 to Node 1")
err = node2.Wrappers_ConnectPeer(node1)
require.NoError(t, err)
utilities.Debug("Connecting Node 3 to Node 1")
err = node3.Wrappers_ConnectPeer(node1)
require.NoError(t, err)
utilities.Debug("Test completed successfully: multiple nodes connected to a single node")
}

View File

@ -0,0 +1,50 @@
package waku_go_tests
import (
"testing"
"github.com/stretchr/testify/require"
testlibs "github.com/waku-org/waku-go-bindings/testlibs/src"
utilities "github.com/waku-org/waku-go-bindings/testlibs/utilities"
"go.uber.org/zap"
)
func TestRelaySubscribeToDefaultTopic(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
utilities.Debug("Starting test to verify relay subscription to the default pubsub topic")
// Define the configuration with relay = true
wakuConfig := *utilities.DefaultWakuConfig
wakuConfig.Relay = true
utilities.Debug("Creating a Waku node with relay enabled")
node, err := testlibs.Wrappers_StartWakuNode(&wakuConfig, logger.Named("TestNode"))
require.NoError(t, err)
defer func() {
utilities.Debug("Stopping and destroying the Waku node")
node.Wrappers_StopAndDestroy()
}()
defaultPubsubTopic := utilities.DefaultPubsubTopic
utilities.Debug("Default pubsub topic retrieved", zap.String("topic", defaultPubsubTopic))
utilities.Debug("Fetching number of connected relay peers before subscription", zap.String("topic", defaultPubsubTopic))
numPeersBefore, err := node.Wrappers_GetNumConnectedRelayPeers(defaultPubsubTopic)
require.NoError(t, err)
utilities.Debug("Number of connected relay peers before subscription", zap.Int("count", numPeersBefore))
utilities.Debug("Attempting to subscribe to the default pubsub topic", zap.String("topic", defaultPubsubTopic))
err = node.Wrappers_RelaySubscribe(defaultPubsubTopic)
require.NoError(t, err)
utilities.Debug("Fetching number of connected relay peers after subscription", zap.String("topic", defaultPubsubTopic))
numPeersAfter, err := node.Wrappers_GetNumConnectedRelayPeers(defaultPubsubTopic)
require.NoError(t, err)
utilities.Debug("Number of connected relay peers after subscription", zap.Int("count", numPeersAfter))
require.Greater(t, numPeersAfter, numPeersBefore, "Number of connected relay peers should increase after subscription")
utilities.Debug("Test successfully verified subscription to the default pubsub topic", zap.String("topic", defaultPubsubTopic))
}

View File

@ -566,10 +566,23 @@ func TestConnectionChange(t *testing.T) {
TcpPort: 60060,
}
wakuConfig3 := WakuConfig{
Relay: false,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: clusterId,
Shards: []uint16{shardId},
Discv5UdpPort: 9060,
TcpPort: 60060,
}
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
require.NoError(t, err)
require.NoError(t, node1.Start())
node3, err := NewWakuNode(&wakuConfig3, logger.Named("node3"))
require.NoError(t, err)
require.NoError(t, node3.Start())
// start node2
wakuConfig2 := WakuConfig{
Relay: true,
@ -602,32 +615,16 @@ func TestConnectionChange(t *testing.T) {
require.NoError(t, err)
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
peerId1, err := node1.PeerID()
//peerId1, err := node1.PeerID()
require.NoError(t, err)
// Wait to receive connectionChange event
select {
case connectionChange := <-node2.ConnectionChangeChan:
require.NotNil(t, connectionChange, "connectionChange should be updated")
require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted")
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
}
peerId3, _ := node3.PeerID()
// Disconnect from node1
err = node2.DisconnectPeerByID(peerId1)
err = node1.DisconnectPeerByID(peerId3)
require.NoError(t, err)
// Wait to receive connectionChange event
select {
case connectionChange := <-node2.ConnectionChangeChan:
require.NotNil(t, connectionChange, "connectionChange should be updated")
require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted")
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
}
// Stop nodes
require.NoError(t, node1.Stop())