Adding peer connections tests

This commit is contained in:
aya 2025-02-12 14:03:36 +02:00
parent 34c6d1ec93
commit 886357d3ee
3 changed files with 127 additions and 32 deletions

View File

@ -35,34 +35,35 @@ func TestNodeRestart(t *testing.T) {
Debug("Creating Node")
nodeConfig := DefaultWakuConfig
node, err := StartWakuNode("TestNode", &nodeConfig)
require.NoError(t, err)
require.NoError(t, err, "Failed to start Waku node")
defer node.StopAndDestroy()
Debug("Node started successfully")
Debug("Fetching ENR before stopping the node")
enrBefore := node.GetENR()
require.NotEmpty(t, enrBefore)
enrBefore, err := node.ENR()
require.NoError(t, err, "Failed to get ENR before stopping")
require.NotEmpty(t, enrBefore, "ENR should not be empty before stopping")
Debug("ENR before stopping: %s", enrBefore)
Debug("Stopping the Node")
err = node.Stop()
require.NoError(t, err)
require.NoError(t, err, "Failed to stop Waku node")
Debug("Node stopped successfully")
Debug("Restarting the Node")
err = node.Start()
require.NoError(t, err)
require.NoError(t, err, "Failed to restart Waku node")
Debug("Node restarted successfully")
Debug("Fetching ENR after restarting the node")
enrAfter := node.GetENR()
require.NotEmpty(t, enrAfter)
enrAfter, err := node.ENR()
require.NoError(t, err, "Failed to get ENR after restarting")
require.NotEmpty(t, enrAfter, "ENR should not be empty after restart")
Debug("ENR after restarting: %s", enrAfter)
Debug("Comparing ENRs before and after restart")
require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart")
Debug("Cleaning up: stopping and destroying the node")
defer node.StopAndDestroy()
Debug("TestNodeRestart completed successfully")
}

View File

@ -689,34 +689,44 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
return err
}
wg := sync.WaitGroup{}
timeout := 3 * time.Second
Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s, timeout: %v", n.nodeName, pubsubTopic, timeout)
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var resp = C.allocResp(nil)
defer C.freeResp(resp)
var cPubsubTopic = C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cPubsubTopic))
if n.wakuCtx == nil {
err := errors.New("wakuCtx is nil")
Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err)
return errors.New("wakuCtx is nil")
return err
}
wg.Add(1)
Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
done := make(chan struct{})
go func() {
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
close(done)
}()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
return nil
select {
case <-ctx.Done():
Error("RelaySubscribe timed out on node %s", n.nodeName)
return errors.New("relay subscribe operation timed out")
case <-done:
if C.getRet(resp) == C.RET_OK {
Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
return nil
}
}
errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
return errors.New(errMsg)
return errors.New("error WakuRelaySubscribe: " + errMsg)
}
func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error {
@ -1512,7 +1522,16 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
}
func ConnectAllPeers(nodes []*WakuNode) error {
Debug("Connecting nodes in a relay chain")
if len(nodes) == 0 {
Error("Cannot connect peers: node list is empty")
return errors.New("node list is empty")
}
timeout := time.Duration(len(nodes)*2) * time.Second
Debug("Connecting nodes in a relay chain with timeout: %v", timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for i := 0; i < len(nodes)-1; i++ {
Debug("Connecting node %d to node %d", i, i+1)
@ -1523,14 +1542,17 @@ func ConnectAllPeers(nodes []*WakuNode) error {
}
}
time.Sleep(4 * time.Second)
Debug("Waiting for connections to stabilize")
<-ctx.Done()
Debug("Connections stabilized")
return nil
}
func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error {
Debug("Verifying if the message was received on node %s", n.nodeName)
timeout := 3 * time.Second
Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case envelope := <-n.MsgChan:
@ -1552,9 +1574,9 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect
}
Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload))
return nil
case <-time.After(5 * time.Second):
Error("Timeout: message not received within 5 seconds on node %s", n.nodeName)
return errors.New("timeout: message not received within 5 seconds")
case <-ctx.Done():
Error("Timeout: message not received within %v on node %s", timeout, n.nodeName)
return errors.New("timeout: message not received within the given duration")
}
}

View File

@ -0,0 +1,72 @@
package waku
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
// Test node connect & disconnect peers
func TestDisconnectPeerNodes(t *testing.T) {
Debug("Starting TestDisconnectPeerNodes")
// Create Node A
nodeA, err := StartWakuNode("nodeA", nil)
require.NoError(t, err, "Failed to start Node A")
defer nodeA.StopAndDestroy()
// Create Node B
nodeB, err := StartWakuNode("nodeB", nil)
require.NoError(t, err, "Failed to start Node B")
defer nodeB.StopAndDestroy()
// Connect Node A to Node B
Debug("Connecting Node A to Node B")
err = nodeA.ConnectPeer(nodeB)
require.NoError(t, err, "Failed to connect nodes")
// Wait for 3 seconds
time.Sleep(3 * time.Second)
// Disconnect Node A from Node B
Debug("Disconnecting Node A from Node B")
err = nodeA.DisconnectPeer(nodeB)
require.NoError(t, err, "Failed to disconnect nodes")
}
func TestConnectMultipleNodesToSingleNode(t *testing.T) {
Debug("Starting TestConnectMultipleNodesToSingleNode")
Debug("Creating 3 nodes")
node1, err := StartWakuNode("Node1", nil)
require.NoError(t, err, "Failed to start Node 1")
defer func() {
Debug("Stopping and destroying Node 1")
node1.StopAndDestroy()
}()
node2, err := StartWakuNode("Node2", nil)
require.NoError(t, err, "Failed to start Node 2")
defer func() {
Debug("Stopping and destroying Node 2")
node2.StopAndDestroy()
}()
node3, err := StartWakuNode("Node3", nil)
require.NoError(t, err, "Failed to start Node 3")
defer func() {
Debug("Stopping and destroying Node 3")
node3.StopAndDestroy()
}()
Debug("Connecting Node 2 to Node 1")
err = node2.ConnectPeer(node1)
require.NoError(t, err, "Failed to connect Node 2 to Node 1")
Debug("Connecting Node 3 to Node 1")
err = node3.ConnectPeer(node1)
require.NoError(t, err, "Failed to connect Node 3 to Node 1")
Debug("Test completed successfully: multiple nodes connected to a single node")
}