From 886357d3ee5d1b4e6e064ec73857169ac18e0094 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 12 Feb 2025 14:03:36 +0200 Subject: [PATCH] Adding peer connections tests --- waku/nodes_basic_test.go | 21 +++++----- waku/nwaku.go | 66 +++++++++++++++++++++----------- waku/peer_connections_test.go | 72 +++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 32 deletions(-) create mode 100644 waku/peer_connections_test.go diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go index ae9205e..92cbe08 100644 --- a/waku/nodes_basic_test.go +++ b/waku/nodes_basic_test.go @@ -35,34 +35,35 @@ func TestNodeRestart(t *testing.T) { Debug("Creating Node") nodeConfig := DefaultWakuConfig node, err := StartWakuNode("TestNode", &nodeConfig) - require.NoError(t, err) + require.NoError(t, err, "Failed to start Waku node") + defer node.StopAndDestroy() + Debug("Node started successfully") Debug("Fetching ENR before stopping the node") - enrBefore := node.GetENR() - require.NotEmpty(t, enrBefore) + enrBefore, err := node.ENR() + require.NoError(t, err, "Failed to get ENR before stopping") + require.NotEmpty(t, enrBefore, "ENR should not be empty before stopping") Debug("ENR before stopping: %s", enrBefore) Debug("Stopping the Node") err = node.Stop() - require.NoError(t, err) + require.NoError(t, err, "Failed to stop Waku node") Debug("Node stopped successfully") Debug("Restarting the Node") err = node.Start() - require.NoError(t, err) + require.NoError(t, err, "Failed to restart Waku node") Debug("Node restarted successfully") Debug("Fetching ENR after restarting the node") - enrAfter := node.GetENR() - require.NotEmpty(t, enrAfter) + enrAfter, err := node.ENR() + require.NoError(t, err, "Failed to get ENR after restarting") + require.NotEmpty(t, enrAfter, "ENR should not be empty after restart") Debug("ENR after restarting: %s", enrAfter) Debug("Comparing ENRs before and after restart") require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart") - Debug("Cleaning up: stopping and destroying the node") - defer node.StopAndDestroy() - Debug("TestNodeRestart completed successfully") } diff --git a/waku/nwaku.go b/waku/nwaku.go index b3ff294..5b47b88 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -689,34 +689,44 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { return err } - wg := sync.WaitGroup{} + timeout := 3 * time.Second + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s, timeout: %v", n.nodeName, pubsubTopic, timeout) - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + var resp = C.allocResp(nil) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) if n.wakuCtx == nil { err := errors.New("wakuCtx is nil") Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) - return errors.New("wakuCtx is nil") + return err } - wg.Add(1) - Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() + done := make(chan struct{}) + go func() { + C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + close(done) + }() - if C.getRet(resp) == C.RET_OK { - - Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - return nil + select { + case <-ctx.Done(): + Error("RelaySubscribe timed out on node %s", n.nodeName) + return errors.New("relay subscribe operation timed out") + case <-done: + if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + return nil + } } - errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) - return errors.New(errMsg) + return errors.New("error WakuRelaySubscribe: " + errMsg) } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -1512,7 +1522,16 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { } func ConnectAllPeers(nodes []*WakuNode) error { - Debug("Connecting nodes in a relay chain") + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() for i := 0; i < len(nodes)-1; i++ { Debug("Connecting node %d to node %d", i, i+1) @@ -1523,14 +1542,17 @@ func ConnectAllPeers(nodes []*WakuNode) error { } } - time.Sleep(4 * time.Second) - Debug("Waiting for connections to stabilize") - + <-ctx.Done() + Debug("Connections stabilized") return nil } func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { - Debug("Verifying if the message was received on node %s", n.nodeName) + timeout := 3 * time.Second + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() select { case envelope := <-n.MsgChan: @@ -1552,9 +1574,9 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect } Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) return nil - case <-time.After(5 * time.Second): - Error("Timeout: message not received within 5 seconds on node %s", n.nodeName) - return errors.New("timeout: message not received within 5 seconds") + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", timeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") } } diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go new file mode 100644 index 0000000..86a531b --- /dev/null +++ b/waku/peer_connections_test.go @@ -0,0 +1,72 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Test node connect & disconnect peers +func TestDisconnectPeerNodes(t *testing.T) { + Debug("Starting TestDisconnectPeerNodes") + + // Create Node A + nodeA, err := StartWakuNode("nodeA", nil) + require.NoError(t, err, "Failed to start Node A") + defer nodeA.StopAndDestroy() + + // Create Node B + nodeB, err := StartWakuNode("nodeB", nil) + require.NoError(t, err, "Failed to start Node B") + defer nodeB.StopAndDestroy() + + // Connect Node A to Node B + Debug("Connecting Node A to Node B") + err = nodeA.ConnectPeer(nodeB) + require.NoError(t, err, "Failed to connect nodes") + + // Wait for 3 seconds + time.Sleep(3 * time.Second) + + // Disconnect Node A from Node B + Debug("Disconnecting Node A from Node B") + err = nodeA.DisconnectPeer(nodeB) + require.NoError(t, err, "Failed to disconnect nodes") +} + +func TestConnectMultipleNodesToSingleNode(t *testing.T) { + Debug("Starting TestConnectMultipleNodesToSingleNode") + + Debug("Creating 3 nodes") + node1, err := StartWakuNode("Node1", nil) + require.NoError(t, err, "Failed to start Node 1") + defer func() { + Debug("Stopping and destroying Node 1") + node1.StopAndDestroy() + }() + + node2, err := StartWakuNode("Node2", nil) + require.NoError(t, err, "Failed to start Node 2") + defer func() { + Debug("Stopping and destroying Node 2") + node2.StopAndDestroy() + }() + + node3, err := StartWakuNode("Node3", nil) + require.NoError(t, err, "Failed to start Node 3") + defer func() { + Debug("Stopping and destroying Node 3") + node3.StopAndDestroy() + }() + + Debug("Connecting Node 2 to Node 1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 2 to Node 1") + + Debug("Connecting Node 3 to Node 1") + err = node3.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 3 to Node 1") + + Debug("Test completed successfully: multiple nodes connected to a single node") +}