From 0b007a22027ac9b21743a7f4d6039bfb6f3e614b Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 12 Feb 2025 12:57:38 +0200 Subject: [PATCH 01/12] Adding additional APIs --- waku/nodes_basic_test.go | 0 waku/nwaku.go | 124 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 waku/nodes_basic_test.go diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go new file mode 100644 index 0000000..e69de29 diff --git a/waku/nwaku.go b/waku/nwaku.go index e3dc669..b3ff294 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -334,6 +334,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) const requestTimeout = 30 * time.Second @@ -683,7 +684,9 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to subscribe to relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -695,18 +698,24 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { defer C.free(unsafe.Pointer(cPubsubTopic)) if n.wakuCtx == nil { + err := errors.New("wakuCtx is nil") + Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) return errors.New("wakuCtx is nil") } wg.Add(1) + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) return errors.New(errMsg) } @@ -743,7 +752,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to unsubscribe from relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -759,15 +770,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { } wg.Add(1) + Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelayUnsubscribe: " + errMsg) } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { @@ -783,18 +798,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) if err != nil { + Error("Failed to parse number of received peers: %v", err) return 0, err } return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("PeerExchangeRequest failed: %v", errMsg) return 0, errors.New(errMsg) } func (n *WakuNode) StartDiscV5() error { - wg := sync.WaitGroup{} + Debug("Starting DiscV5 for node: %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -802,9 +820,11 @@ func (n *WakuNode) StartDiscV5() error { C.cGoWakuStartDiscV5(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully started DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -819,9 +839,11 @@ func (n *WakuNode) StopDiscV5() error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -837,11 +859,13 @@ func (n *WakuNode) Version() (string, error) { if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) return version, nil } errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) return "", errors.New(errMsg) } @@ -918,6 +942,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), errors.New(errMsg) } +func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { + if n == nil { + err := errors.New("cannot publish message; node is nil") + Error("Failed to publish message via relay: %v", err) + return "", err + } + + // Handling context internally with a timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Attempting to publish message via relay on node %s", n.nodeName) + + msgHash, err := n.RelayPublish(ctx, message, pubsubTopic) + if err != nil { + Error("Failed to publish message via relay on node %s: %v", n.nodeName, err) + return "", err + } + + Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String()) + return msgHash, nil +} + func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} @@ -1463,3 +1510,70 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } + +func ConnectAllPeers(nodes []*WakuNode) error { + Debug("Connecting nodes in a relay chain") + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + time.Sleep(4 * time.Second) + Debug("Waiting for connections to stabilize") + + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { + Debug("Verifying if the message was received on node %s", n.nodeName) + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-time.After(5 * time.Second): + Error("Timeout: message not received within 5 seconds on node %s", n.nodeName) + return errors.New("timeout: message not received within 5 seconds") + } +} + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +} From 34c6d1ec936d2c63f075264ce1ec33956901ed56 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 12 Feb 2025 13:21:47 +0200 Subject: [PATCH 02/12] Add basic node tests --- waku/nodes_basic_test.go | 68 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go index e69de29..ae9205e 100644 --- a/waku/nodes_basic_test.go +++ b/waku/nodes_basic_test.go @@ -0,0 +1,68 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBasicWakuNodes(t *testing.T) { + Debug("Starting TestBasicWakuNodes") + + nodeCfg := DefaultWakuConfig + nodeCfg.Relay = true + + Debug("Starting the WakuNode") + node, err := StartWakuNode("node", &nodeCfg) + require.NoError(t, err, "Failed to create the WakuNode") + + // Use defer to ensure proper cleanup + defer func() { + Debug("Stopping and destroying Node") + node.StopAndDestroy() + }() + + Debug("Successfully created the WakuNode") + time.Sleep(2 * time.Second) + + Debug("TestBasicWakuNodes completed successfully") +} + +func TestNodeRestart(t *testing.T) { + Debug("Starting TestNodeRestart") + + Debug("Creating Node") + nodeConfig := DefaultWakuConfig + node, err := StartWakuNode("TestNode", &nodeConfig) + require.NoError(t, err) + Debug("Node started successfully") + + Debug("Fetching ENR before stopping the node") + enrBefore := node.GetENR() + require.NotEmpty(t, enrBefore) + Debug("ENR before stopping: %s", enrBefore) + + Debug("Stopping the Node") + err = node.Stop() + require.NoError(t, err) + Debug("Node stopped successfully") + + Debug("Restarting the Node") + err = node.Start() + require.NoError(t, err) + Debug("Node restarted successfully") + + Debug("Fetching ENR after restarting the node") + enrAfter := node.GetENR() + require.NotEmpty(t, enrAfter) + Debug("ENR after restarting: %s", enrAfter) + + Debug("Comparing ENRs before and after restart") + require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart") + + Debug("Cleaning up: stopping and destroying the node") + defer node.StopAndDestroy() + + Debug("TestNodeRestart completed successfully") +} From 886357d3ee5d1b4e6e064ec73857169ac18e0094 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 12 Feb 2025 14:03:36 +0200 Subject: [PATCH 03/12] Adding peer connections tests --- waku/nodes_basic_test.go | 21 +++++----- waku/nwaku.go | 66 +++++++++++++++++++++----------- waku/peer_connections_test.go | 72 +++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 32 deletions(-) create mode 100644 waku/peer_connections_test.go diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go index ae9205e..92cbe08 100644 --- a/waku/nodes_basic_test.go +++ b/waku/nodes_basic_test.go @@ -35,34 +35,35 @@ func TestNodeRestart(t *testing.T) { Debug("Creating Node") nodeConfig := DefaultWakuConfig node, err := StartWakuNode("TestNode", &nodeConfig) - require.NoError(t, err) + require.NoError(t, err, "Failed to start Waku node") + defer node.StopAndDestroy() + Debug("Node started successfully") Debug("Fetching ENR before stopping the node") - enrBefore := node.GetENR() - require.NotEmpty(t, enrBefore) + enrBefore, err := node.ENR() + require.NoError(t, err, "Failed to get ENR before stopping") + require.NotEmpty(t, enrBefore, "ENR should not be empty before stopping") Debug("ENR before stopping: %s", enrBefore) Debug("Stopping the Node") err = node.Stop() - require.NoError(t, err) + require.NoError(t, err, "Failed to stop Waku node") Debug("Node stopped successfully") Debug("Restarting the Node") err = node.Start() - require.NoError(t, err) + require.NoError(t, err, "Failed to restart Waku node") Debug("Node restarted successfully") Debug("Fetching ENR after restarting the node") - enrAfter := node.GetENR() - require.NotEmpty(t, enrAfter) + enrAfter, err := node.ENR() + require.NoError(t, err, "Failed to get ENR after restarting") + require.NotEmpty(t, enrAfter, "ENR should not be empty after restart") Debug("ENR after restarting: %s", enrAfter) Debug("Comparing ENRs before and after restart") require.Equal(t, enrBefore, enrAfter, "ENR should remain the same after node restart") - Debug("Cleaning up: stopping and destroying the node") - defer node.StopAndDestroy() - Debug("TestNodeRestart completed successfully") } diff --git a/waku/nwaku.go b/waku/nwaku.go index b3ff294..5b47b88 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -689,34 +689,44 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { return err } - wg := sync.WaitGroup{} + timeout := 3 * time.Second + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s, timeout: %v", n.nodeName, pubsubTopic, timeout) - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + var resp = C.allocResp(nil) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) if n.wakuCtx == nil { err := errors.New("wakuCtx is nil") Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) - return errors.New("wakuCtx is nil") + return err } - wg.Add(1) - Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() + done := make(chan struct{}) + go func() { + C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + close(done) + }() - if C.getRet(resp) == C.RET_OK { - - Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - return nil + select { + case <-ctx.Done(): + Error("RelaySubscribe timed out on node %s", n.nodeName) + return errors.New("relay subscribe operation timed out") + case <-done: + if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + return nil + } } - errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) - return errors.New(errMsg) + return errors.New("error WakuRelaySubscribe: " + errMsg) } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -1512,7 +1522,16 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { } func ConnectAllPeers(nodes []*WakuNode) error { - Debug("Connecting nodes in a relay chain") + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() for i := 0; i < len(nodes)-1; i++ { Debug("Connecting node %d to node %d", i, i+1) @@ -1523,14 +1542,17 @@ func ConnectAllPeers(nodes []*WakuNode) error { } } - time.Sleep(4 * time.Second) - Debug("Waiting for connections to stabilize") - + <-ctx.Done() + Debug("Connections stabilized") return nil } func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { - Debug("Verifying if the message was received on node %s", n.nodeName) + timeout := 3 * time.Second + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() select { case envelope := <-n.MsgChan: @@ -1552,9 +1574,9 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect } Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) return nil - case <-time.After(5 * time.Second): - Error("Timeout: message not received within 5 seconds on node %s", n.nodeName) - return errors.New("timeout: message not received within 5 seconds") + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", timeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") } } diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go new file mode 100644 index 0000000..86a531b --- /dev/null +++ b/waku/peer_connections_test.go @@ -0,0 +1,72 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Test node connect & disconnect peers +func TestDisconnectPeerNodes(t *testing.T) { + Debug("Starting TestDisconnectPeerNodes") + + // Create Node A + nodeA, err := StartWakuNode("nodeA", nil) + require.NoError(t, err, "Failed to start Node A") + defer nodeA.StopAndDestroy() + + // Create Node B + nodeB, err := StartWakuNode("nodeB", nil) + require.NoError(t, err, "Failed to start Node B") + defer nodeB.StopAndDestroy() + + // Connect Node A to Node B + Debug("Connecting Node A to Node B") + err = nodeA.ConnectPeer(nodeB) + require.NoError(t, err, "Failed to connect nodes") + + // Wait for 3 seconds + time.Sleep(3 * time.Second) + + // Disconnect Node A from Node B + Debug("Disconnecting Node A from Node B") + err = nodeA.DisconnectPeer(nodeB) + require.NoError(t, err, "Failed to disconnect nodes") +} + +func TestConnectMultipleNodesToSingleNode(t *testing.T) { + Debug("Starting TestConnectMultipleNodesToSingleNode") + + Debug("Creating 3 nodes") + node1, err := StartWakuNode("Node1", nil) + require.NoError(t, err, "Failed to start Node 1") + defer func() { + Debug("Stopping and destroying Node 1") + node1.StopAndDestroy() + }() + + node2, err := StartWakuNode("Node2", nil) + require.NoError(t, err, "Failed to start Node 2") + defer func() { + Debug("Stopping and destroying Node 2") + node2.StopAndDestroy() + }() + + node3, err := StartWakuNode("Node3", nil) + require.NoError(t, err, "Failed to start Node 3") + defer func() { + Debug("Stopping and destroying Node 3") + node3.StopAndDestroy() + }() + + Debug("Connecting Node 2 to Node 1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 2 to Node 1") + + Debug("Connecting Node 3 to Node 1") + err = node3.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node 3 to Node 1") + + Debug("Test completed successfully: multiple nodes connected to a single node") +} From d108ed15f068c2698d011b69e986afe94be92ad2 Mon Sep 17 00:00:00 2001 From: aya Date: Thu, 13 Feb 2025 11:31:08 +0200 Subject: [PATCH 04/12] Enhance Peer tests --- waku/nwaku.go | 8 +++++++ waku/peer_connections_test.go | 42 ++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 5b47b88..6f08b07 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -1422,7 +1422,15 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { var nodeCfg WakuConfig if customCfg == nil { + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports + } + nodeCfg = DefaultWakuConfig + nodeCfg.TcpPort = tcpPort + nodeCfg.Discv5UdpPort = udpPort } else { nodeCfg = *customCfg } diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index 86a531b..1b3766e 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -1,6 +1,7 @@ package waku import ( + "slices" "testing" "time" @@ -11,49 +12,59 @@ import ( func TestDisconnectPeerNodes(t *testing.T) { Debug("Starting TestDisconnectPeerNodes") - // Create Node A nodeA, err := StartWakuNode("nodeA", nil) require.NoError(t, err, "Failed to start Node A") defer nodeA.StopAndDestroy() - // Create Node B nodeB, err := StartWakuNode("nodeB", nil) require.NoError(t, err, "Failed to start Node B") defer nodeB.StopAndDestroy() - // Connect Node A to Node B Debug("Connecting Node A to Node B") err = nodeA.ConnectPeer(nodeB) require.NoError(t, err, "Failed to connect nodes") - // Wait for 3 seconds + Debug("Verifying connection between Node A and Node B") + connectedPeers, err := nodeA.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node A") + nodeBPeerID, err := nodeB.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node B") + require.True(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should be a peer of Node A before disconnection") + time.Sleep(3 * time.Second) - // Disconnect Node A from Node B Debug("Disconnecting Node A from Node B") err = nodeA.DisconnectPeer(nodeB) require.NoError(t, err, "Failed to disconnect nodes") + + Debug("Verifying disconnection between Node A and Node B") + connectedPeers, err = nodeA.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node A after disconnection") + require.False(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should no longer be a peer of Node A after disconnection") + + Debug("Test completed successfully: Node B was disconnected from Node A") } func TestConnectMultipleNodesToSingleNode(t *testing.T) { Debug("Starting TestConnectMultipleNodesToSingleNode") - Debug("Creating 3 nodes") - node1, err := StartWakuNode("Node1", nil) + Debug("Creating 3 nodes with automatically assigned ports") + + node1, err := StartWakuNode("node1", nil) require.NoError(t, err, "Failed to start Node 1") defer func() { Debug("Stopping and destroying Node 1") node1.StopAndDestroy() }() - node2, err := StartWakuNode("Node2", nil) + node2, err := StartWakuNode("node2", nil) require.NoError(t, err, "Failed to start Node 2") defer func() { Debug("Stopping and destroying Node 2") node2.StopAndDestroy() }() - node3, err := StartWakuNode("Node3", nil) + node3, err := StartWakuNode("node3", nil) require.NoError(t, err, "Failed to start Node 3") defer func() { Debug("Stopping and destroying Node 3") @@ -68,5 +79,16 @@ func TestConnectMultipleNodesToSingleNode(t *testing.T) { err = node3.ConnectPeer(node1) require.NoError(t, err, "Failed to connect Node 3 to Node 1") - Debug("Test completed successfully: multiple nodes connected to a single node") + Debug("Verifying connected peers for Node 3") + connectedPeers, err := node3.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node 3") + node1PeerID, err := node1.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 1") + node2PeerID, err := node2.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 2") + + require.True(t, slices.Contains(connectedPeers, node1PeerID), "Node 1 should be a peer of Node 3") + require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 3") + + Debug("Test completed successfully: multiple nodes connected to a single node and verified peers") } From 0ecd2580dcc239c5220cc71b3208bedafea5a99b Mon Sep 17 00:00:00 2001 From: aya Date: Thu, 13 Feb 2025 12:24:33 +0200 Subject: [PATCH 05/12] Adding relay tests --- waku/nwaku.go | 18 +-- waku/peer_connections_test.go | 1 - waku/relay_test.go | 234 ++++++++++++++++++++++++++++++++++ 3 files changed, 244 insertions(+), 9 deletions(-) create mode 100644 waku/relay_test.go diff --git a/waku/nwaku.go b/waku/nwaku.go index 6f08b07..acdefca 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -1422,19 +1422,21 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { var nodeCfg WakuConfig if customCfg == nil { - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) - if err != nil { - Error("Failed to allocate unique ports: %v", err) - tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports - } - nodeCfg = DefaultWakuConfig - nodeCfg.TcpPort = tcpPort - nodeCfg.Discv5UdpPort = udpPort } else { nodeCfg = *customCfg } + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports + } + + nodeCfg = DefaultWakuConfig + nodeCfg.TcpPort = tcpPort + nodeCfg.Discv5UdpPort = udpPort + Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) if err != nil { diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index 1b3766e..5576a16 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -41,7 +41,6 @@ func TestDisconnectPeerNodes(t *testing.T) { connectedPeers, err = nodeA.GetConnectedPeers() require.NoError(t, err, "Failed to get connected peers for Node A after disconnection") require.False(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should no longer be a peer of Node A after disconnection") - Debug("Test completed successfully: Node B was disconnected from Node A") } diff --git a/waku/relay_test.go b/waku/relay_test.go new file mode 100644 index 0000000..20c928d --- /dev/null +++ b/waku/relay_test.go @@ -0,0 +1,234 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestRelaySubscribeToDefaultTopic(t *testing.T) { + Debug("Starting test to verify relay subscription to the default pubsub topic") + + wakuConfig := DefaultWakuConfig + wakuConfig.Relay = true + + Debug("Creating a Waku node with relay enabled") + node, err := StartWakuNode("TestNode", &wakuConfig) + require.NoError(t, err) + defer func() { + Debug("Stopping and destroying the Waku node") + node.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + Debug("Attempting to subscribe to the default pubsub topic %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Test successfully verified subscription to the default pubsub topic: %s", defaultPubsubTopic) +} + +func TestRelayMessageTransmission(t *testing.T) { + Debug("Starting TestRelayMessageTransmission") + + senderConfig := DefaultWakuConfig + senderConfig.Relay = true + senderNode, err := StartWakuNode("SenderNode", &senderConfig) + require.NoError(t, err) + defer senderNode.StopAndDestroy() + + receiverConfig := DefaultWakuConfig + receiverConfig.Relay = true + receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig) + require.NoError(t, err) + defer receiverNode.StopAndDestroy() + + Debug("Connecting sender and receiver") + err = RetryWithBackOff(func() error { + return senderNode.ConnectPeer(receiverNode) + }) + require.NoError(t, err) + + Debug("Subscribing receiver to the default pubsub topic") + defaultPubsubTopic := DefaultPubsubTopic + err = RetryWithBackOff(func() error { + return receiverNode.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Creating and publishing message") + message := senderNode.CreateMessage() + var msgHash string + err = RetryWithBackOff(func() error { + var err error + msgHashObj, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + if err == nil { + msgHash = msgHashObj.String() + } + return err + + }) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Waiting to ensure message delivery") + time.Sleep(2 * time.Second) + + Debug("Verifying message reception") + err = RetryWithBackOff(func() error { + msgHashObj, _ := common.ToMessageHash(msgHash) + return receiverNode.VerifyMessageReceived(message, msgHashObj) + }) + require.NoError(t, err, "message verification failed") + + Debug("TestRelayMessageTransmission completed successfully") +} + +func TestRelayMessageBroadcast(t *testing.T) { + Debug("Starting TestRelayMessageBroadcast") + + numPeers := 5 + nodes := make([]*WakuNode, numPeers) + nodeNames := []string{"SenderNode", "PeerNode1", "PeerNode2", "PeerNode3", "PeerNode4"} + defaultPubsubTopic := DefaultPubsubTopic + + for i := 0; i < numPeers; i++ { + Debug("Creating node %s", nodeNames[i]) + nodeConfig := DefaultWakuConfig + nodeConfig.Relay = true + node, err := StartWakuNode(nodeNames[i], &nodeConfig) + require.NoError(t, err) + defer node.StopAndDestroy() + nodes[i] = node + } + + err := ConnectAllPeers(nodes) + require.NoError(t, err) + + Debug("Subscribing nodes to the default pubsub topic") + for _, node := range nodes { + err := node.RelaySubscribe(defaultPubsubTopic) + require.NoError(t, err) + } + + senderNode := nodes[0] + Debug("SenderNode is publishing a message") + message := senderNode.CreateMessage() + msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Waiting to ensure message delivery") + time.Sleep(3 * time.Second) + + Debug("Verifying message reception for each node") + for i, node := range nodes { + Debug("Verifying message for node %s", nodeNames[i]) + err := node.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "message verification failed for node: %s", nodeNames[i]) + } + + Debug("TestRelayMessageBroadcast completed successfully") +} + +func TestSendmsgInvalidPayload(t *testing.T) { + Debug("Starting TestInvalidMessageFormat") + + nodeNames := []string{"SenderNode", "PeerNode1"} + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Creating nodes") + senderNodeConfig := DefaultWakuConfig + senderNodeConfig.Relay = true + senderNode, err := StartWakuNode(nodeNames[0], &senderNodeConfig) + require.NoError(t, err) + defer senderNode.StopAndDestroy() + + receiverNodeConfig := DefaultWakuConfig + receiverNodeConfig.Relay = true + receiverNode, err := StartWakuNode(nodeNames[1], &receiverNodeConfig) + require.NoError(t, err) + defer receiverNode.StopAndDestroy() + + Debug("Connecting SenderNode and PeerNode1") + err = senderNode.ConnectPeer(receiverNode) + require.NoError(t, err) + + Debug("Subscribing SenderNode to the default pubsub topic") + err = senderNode.RelaySubscribe(defaultPubsubTopic) + require.NoError(t, err) + + Debug("SenderNode is publishing an invalid message") + invalidMessage := &pb.WakuMessage{ + Payload: []byte{}, // Empty payload + Version: proto.Uint32(0), + } + + message := senderNode.CreateMessage(invalidMessage) + msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + + Debug("Verifying if message was sent or failed") + if err != nil { + Debug("Message was not sent due to invalid format: %v", err) + require.Error(t, err, "message should fail due to invalid format") + } else { + Debug("Message was unexpectedly sent: %s", msgHash.String()) + require.Fail(t, "message with invalid format should not be sent") + } + + Debug("TestInvalidMessageFormat completed") +} + +func TestMessageNotReceivedWithoutRelay(t *testing.T) { + Debug("Starting TestMessageNotReceivedWithoutRelay") + + Debug("Creating Sender Node with Relay disabled") + senderConfig := DefaultWakuConfig + senderConfig.Relay = true + senderNode, err := StartWakuNode("SenderNode", &senderConfig) + require.NoError(t, err) + defer senderNode.StopAndDestroy() + + Debug("Creating Receiver Node with Relay disabled") + receiverConfig := DefaultWakuConfig + receiverConfig.Relay = true + receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig) + require.NoError(t, err) + defer receiverNode.StopAndDestroy() + + Debug("Connecting Sender and Receiver") + err = senderNode.ConnectPeer(receiverNode) + require.NoError(t, err) + + Debug("Subscribing Receiver to the default pubsub topic") + defaultPubsubTopic := DefaultPubsubTopic + err = senderNode.RelaySubscribe(defaultPubsubTopic) + require.NoError(t, err) + err = receiverNode.RelaySubscribe(defaultPubsubTopic) + require.NoError(t, err) + + Debug("SenderNode is publishing a message") + message := senderNode.CreateMessage() + msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Waiting to ensure message delivery") + time.Sleep(3 * time.Second) + + Debug("Verifying that Receiver did NOT receive the message") + err = receiverNode.VerifyMessageReceived(message, msgHash) + if err == nil { + t.Fatalf("Test failed: ReceiverNode SHOULD NOT have received the message") + } + + Debug("TestMessageNotReceivedWithoutRelay completed successfully") +} From 77c50b02ea6b376ff3393ce8fd1bb37417df6c7a Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 16 Feb 2025 08:36:40 +0200 Subject: [PATCH 06/12] Fix failing issue --- waku/nwaku.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index acdefca..0105789 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -1422,6 +1422,7 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { var nodeCfg WakuConfig if customCfg == nil { + nodeCfg = DefaultWakuConfig } else { nodeCfg = *customCfg @@ -1433,7 +1434,6 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports } - nodeCfg = DefaultWakuConfig nodeCfg.TcpPort = tcpPort nodeCfg.Discv5UdpPort = udpPort From 8c2ef843687a16d42d35bd36ae3110ee6b7059d7 Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 16 Feb 2025 13:30:57 +0200 Subject: [PATCH 07/12] Add new swt of relay tests --- waku/nwaku.go | 90 +++++++--- waku/relay_test.go | 420 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 428 insertions(+), 82 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 0105789..c7d269e 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -657,7 +657,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - Debug("No connected peers found for " + n.nodeName) + Debug("No connected peers found for %s", n.nodeName) return nil, nil } @@ -689,39 +689,32 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { return err } - timeout := 3 * time.Second - Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s, timeout: %v", n.nodeName, pubsubTopic, timeout) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - var resp = C.allocResp(nil) - defer C.freeResp(resp) - - var cPubsubTopic = C.CString(pubsubTopic) - defer C.free(unsafe.Pointer(cPubsubTopic)) - if n.wakuCtx == nil { err := errors.New("wakuCtx is nil") Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) return err } - done := make(chan struct{}) - go func() { - C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - close(done) - }() + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - select { - case <-ctx.Done(): - Error("RelaySubscribe timed out on node %s", n.nodeName) - return errors.New("relay subscribe operation timed out") - case <-done: - if C.getRet(resp) == C.RET_OK { - Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - return nil - } + wg := sync.WaitGroup{} + wg.Add(1) + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPubsubTopic = C.CString(pubsubTopic) + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + + Debug("Waiting for response from cGoWakuRelaySubscribe on node %s", n.nodeName) + wg.Wait() // Ensures the function completes before proceeding + + if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + return nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -1099,7 +1092,7 @@ func (n *WakuNode) Destroy() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully destroyed " + n.nodeName) + Debug("Successfully destroyed %s", n.nodeName) return nil } @@ -1609,3 +1602,44 @@ func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessa Debug("Successfully created a default WakuMessage on node %s", n.nodeName) return defaultMessage } + +func WaitForAutoConnection(nodeList []*WakuNode) error { + Debug("Waiting for auto-connection of nodes...") + + var hardWait = 30 * time.Second + Debug("Applying hard wait of %v seconds before checking connections", hardWait.Seconds()) + time.Sleep(hardWait) + + for _, node := range nodeList { + peers, err := node.GetConnectedPeers() + if err != nil { + Error("Failed to get connected peers for node %s: %v", node.nodeName, err) + return err + } + + if len(peers) < 1 { + Error("Node %s has no connected peers, expected at least 1", node.nodeName) + return errors.New("expected at least one connected peer") + } + + Debug("Node %s has %d connected peers", node.nodeName, len(peers)) + } + + Debug("Auto-connection check completed successfully") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := RetryWithBackOff(func() error { + return node.RelaySubscribe(topic) + }) + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +} diff --git a/waku/relay_test.go b/waku/relay_test.go index 20c928d..4f39d96 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -11,53 +11,116 @@ import ( ) func TestRelaySubscribeToDefaultTopic(t *testing.T) { - Debug("Starting test to verify relay subscription to the default pubsub topic") + Debug("Starting test to verify relay subscription with multiple nodes via Discv5") - wakuConfig := DefaultWakuConfig - wakuConfig.Relay = true + // Configure Node1 + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + // Retrieve Node1's ENR for Discv5 bootstrapping + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + // Configure Node2 with Node1 as its Discv5 bootstrap node + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + // Configure Node3 with Node1 as its Discv5 bootstrap node + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node3 with Node1 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") - Debug("Creating a Waku node with relay enabled") - node, err := StartWakuNode("TestNode", &wakuConfig) - require.NoError(t, err) defer func() { - Debug("Stopping and destroying the Waku node") - node.StopAndDestroy() + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() }() defaultPubsubTopic := DefaultPubsubTopic Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) - Debug("Attempting to subscribe to the default pubsub topic %s", defaultPubsubTopic) + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Subscribing Node1 to the default pubsub topic: %s", defaultPubsubTopic) err = RetryWithBackOff(func() error { - return node.RelaySubscribe(defaultPubsubTopic) + return node1.RelaySubscribe(defaultPubsubTopic) }) require.NoError(t, err) - Debug("Test successfully verified subscription to the default pubsub topic: %s", defaultPubsubTopic) + Debug("Subscribing Node2 to the default pubsub topic: %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node2.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Subscribing Node3 to the default pubsub topic: %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node3.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Waiting for peer connections to stabilize") + time.Sleep(10 * time.Second) + + Debug("Fetching number of connected peers for Node1") + peers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of connected relay peers for Node1") + + Debug("Total number of connected relay peers for Node1: %d", peers) + + if peers != 2 { + Error("Expected Node1 to have exactly 2 relay peers, but got %d", peers) + t.FailNow() + } + + Debug("Test successfully verified that Node1 has 2 relay peers after Node2 and Node3 subscribed") } func TestRelayMessageTransmission(t *testing.T) { Debug("Starting TestRelayMessageTransmission") + Debug("Creating Sender Node with Relay enabled") senderConfig := DefaultWakuConfig senderConfig.Relay = true + senderNode, err := StartWakuNode("SenderNode", &senderConfig) - require.NoError(t, err) + require.NoError(t, err, "Failed to start SenderNode") defer senderNode.StopAndDestroy() + Debug("Creating Receiver Node with Relay enabled") receiverConfig := DefaultWakuConfig receiverConfig.Relay = true + + // Set the Receiver Node's discovery bootstrap node as SenderNode + enrSender, err := senderNode.ENR() + require.NoError(t, err, "Failed to get ENR for SenderNode") + receiverConfig.Discv5BootstrapNodes = []string{enrSender.String()} + receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig) - require.NoError(t, err) + require.NoError(t, err, "Failed to start ReceiverNode") defer receiverNode.StopAndDestroy() - Debug("Connecting sender and receiver") - err = RetryWithBackOff(func() error { - return senderNode.ConnectPeer(receiverNode) - }) - require.NoError(t, err) + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") - Debug("Subscribing receiver to the default pubsub topic") + Debug("Subscribing ReceiverNode to the default pubsub topic") defaultPubsubTopic := DefaultPubsubTopic err = RetryWithBackOff(func() error { return receiverNode.RelaySubscribe(defaultPubsubTopic) @@ -67,6 +130,7 @@ func TestRelayMessageTransmission(t *testing.T) { Debug("Creating and publishing message") message := senderNode.CreateMessage() var msgHash string + err = RetryWithBackOff(func() error { var err error msgHashObj, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) @@ -74,7 +138,6 @@ func TestRelayMessageTransmission(t *testing.T) { msgHash = msgHashObj.String() } return err - }) require.NoError(t, err) require.NotEmpty(t, msgHash) @@ -87,7 +150,7 @@ func TestRelayMessageTransmission(t *testing.T) { msgHashObj, _ := common.ToMessageHash(msgHash) return receiverNode.VerifyMessageReceived(message, msgHashObj) }) - require.NoError(t, err, "message verification failed") + require.NoError(t, err, "Message verification failed") Debug("TestRelayMessageTransmission completed successfully") } @@ -104,21 +167,25 @@ func TestRelayMessageBroadcast(t *testing.T) { Debug("Creating node %s", nodeNames[i]) nodeConfig := DefaultWakuConfig nodeConfig.Relay = true + if i > 0 { + enrPrevNode, err := nodes[i-1].ENR() + require.NoError(t, err, "Failed to get ENR for node %s", nodeNames[i-1]) + nodeConfig.Discv5BootstrapNodes = []string{enrPrevNode.String()} + } node, err := StartWakuNode(nodeNames[i], &nodeConfig) require.NoError(t, err) defer node.StopAndDestroy() nodes[i] = node } - err := ConnectAllPeers(nodes) - require.NoError(t, err) - Debug("Subscribing nodes to the default pubsub topic") for _, node := range nodes { err := node.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) } + WaitForAutoConnection(nodes) + senderNode := nodes[0] Debug("SenderNode is publishing a message") message := senderNode.CreateMessage() @@ -142,43 +209,49 @@ func TestRelayMessageBroadcast(t *testing.T) { func TestSendmsgInvalidPayload(t *testing.T) { Debug("Starting TestInvalidMessageFormat") - nodeNames := []string{"SenderNode", "PeerNode1"} defaultPubsubTopic := DefaultPubsubTopic Debug("Creating nodes") senderNodeConfig := DefaultWakuConfig senderNodeConfig.Relay = true - senderNode, err := StartWakuNode(nodeNames[0], &senderNodeConfig) + senderNode, err := StartWakuNode("SenderNode", &senderNodeConfig) require.NoError(t, err) defer senderNode.StopAndDestroy() receiverNodeConfig := DefaultWakuConfig receiverNodeConfig.Relay = true - receiverNode, err := StartWakuNode(nodeNames[1], &receiverNodeConfig) + enrNode2, err := senderNode.ENR() + if err != nil { + require.Error(t, err, "Can't find node ENR") + } + receiverNodeConfig.Discv5BootstrapNodes = []string{enrNode2.String()} + receiverNode, err := StartWakuNode("receiverNode", &receiverNodeConfig) require.NoError(t, err) defer receiverNode.StopAndDestroy() - Debug("Connecting SenderNode and PeerNode1") - err = senderNode.ConnectPeer(receiverNode) - require.NoError(t, err) - Debug("Subscribing SenderNode to the default pubsub topic") err = senderNode.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) + err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + Debug("SenderNode is publishing an invalid message") invalidMessage := &pb.WakuMessage{ - Payload: []byte{}, // Empty payload - Version: proto.Uint32(0), + Payload: []byte{}, + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), } message := senderNode.CreateMessage(invalidMessage) - msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + var msgHash common.MessageHash + msgHash, err = senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) Debug("Verifying if message was sent or failed") if err != nil { Debug("Message was not sent due to invalid format: %v", err) - require.Error(t, err, "message should fail due to invalid format") + } else { Debug("Message was unexpectedly sent: %s", msgHash.String()) require.Fail(t, "message with invalid format should not be sent") @@ -187,48 +260,287 @@ func TestSendmsgInvalidPayload(t *testing.T) { Debug("TestInvalidMessageFormat completed") } -func TestMessageNotReceivedWithoutRelay(t *testing.T) { +func TestRelayNodesNotConnectedDirectly(t *testing.T) { Debug("Starting TestMessageNotReceivedWithoutRelay") - Debug("Creating Sender Node with Relay disabled") + Debug("Creating Sender Node with Relay enabled") senderConfig := DefaultWakuConfig senderConfig.Relay = true senderNode, err := StartWakuNode("SenderNode", &senderConfig) require.NoError(t, err) defer senderNode.StopAndDestroy() - Debug("Creating Receiver Node with Relay disabled") - receiverConfig := DefaultWakuConfig - receiverConfig.Relay = true - receiverNode, err := StartWakuNode("ReceiverNode", &receiverConfig) + Debug("Creating Relay-Enabled Receiver Node (Node2)") + node2Config := DefaultWakuConfig + node2Config.Relay = true + enrNode, err := senderNode.ENR() + if err != nil { + require.Error(t, err, "Can't find node ENR") + } + node2Config.Discv5BootstrapNodes = []string{enrNode.String()} + node2, err := StartWakuNode("Node2", &node2Config) require.NoError(t, err) - defer receiverNode.StopAndDestroy() + defer node2.StopAndDestroy() - Debug("Connecting Sender and Receiver") - err = senderNode.ConnectPeer(receiverNode) + Debug("Creating Non-Relay Receiver Node (Node3)") + node3Config := DefaultWakuConfig + node3Config.Relay = true + enrNode2, err := node2.ENR() + if err != nil { + require.Error(t, err, "Can't find node ENR") + } + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + node3, err := StartWakuNode("Node3", &node3Config) require.NoError(t, err) + defer node3.StopAndDestroy() - Debug("Subscribing Receiver to the default pubsub topic") + Debug("Subscribing Node2 to the default pubsub topic") defaultPubsubTopic := DefaultPubsubTopic - err = senderNode.RelaySubscribe(defaultPubsubTopic) + err = node2.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) - err = receiverNode.RelaySubscribe(defaultPubsubTopic) + + Debug("Subscribing Node2 to the default pubsub topic") + err = node3.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) + Debug("Waiting for nodes to auto-connect before proceeding") + err = WaitForAutoConnection([]*WakuNode{senderNode, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + Debug("SenderNode is publishing a message") message := senderNode.CreateMessage() msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) require.NoError(t, err) require.NotEmpty(t, msgHash) - Debug("Waiting to ensure message delivery") - time.Sleep(3 * time.Second) + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") - Debug("Verifying that Receiver did NOT receive the message") - err = receiverNode.VerifyMessageReceived(message, msgHash) - if err == nil { - t.Fatalf("Test failed: ReceiverNode SHOULD NOT have received the message") - } + Debug("Verifying that Node3 receive the message") + err = node3.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node3 should have received the message") Debug("TestMessageNotReceivedWithoutRelay completed successfully") } + +func TestRelaySubscribeAndPeerCountChange(t *testing.T) { + Debug("Starting test to verify relay subscription and peer count change after stopping a node") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + //enrNode2, err := node2.ENR() + //require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Subscribing Node1 to the default pubsub topic: %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node1.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Subscribing Node2 to the default pubsub topic: %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node2.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Subscribing Node3 to the default pubsub topic: %s", defaultPubsubTopic) + err = RetryWithBackOff(func() error { + return node3.RelaySubscribe(defaultPubsubTopic) + }) + require.NoError(t, err) + + Debug("Waiting for peer connections to stabilize") + time.Sleep(10 * time.Second) + + Debug("Fetching number of connected peers for Node1") + peersBeforeStopping, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of connected relay peers for Node1") + + Debug("Total number of connected relay peers for Node1 before stopping Node3: %d", peersBeforeStopping) + require.Equal(t, 2, peersBeforeStopping, "Expected Node1 to have exactly 2 relay peers before stopping Node3") + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network to update after Node3 stops") + time.Sleep(10 * time.Second) + + Debug("Fetching number of connected peers for Node1 after stopping Node3") + peersAfterStopping, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of connected relay peers for Node1 after stopping Node3") + + Debug("Total number of connected relay peers for Node1 after stopping Node3: %d", peersAfterStopping) + + require.Equal(t, 1, peersAfterStopping, "Expected Node1 to have exactly 1 relay peer after stopping Node3") + Debug("Test successfully verified peer count changes as expected after stopping Node3") +} + +func TestDiscv5PeerMeshCount(t *testing.T) { + Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node2Config.Relay = true + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node3Config.Relay = true + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe all nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in mesh for Node1 before stopping Node3") + peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") + + Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore) + require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 3 peers in the mesh before stopping Node3") + + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network update after Node3 stops") + time.Sleep(10 * time.Second) + + Debug("Fetching number of peers in mesh for Node1 after stopping Node3") + peerCountAfter, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") + + Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter) + require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 2 peers in the mesh after stopping Node3") + + Debug("Test successfully verified peer count change after stopping Node3") +} + +func TestDiscv5GetPeersConnected(t *testing.T) { + Debug("Starting test to verify peer count in mesh with 4 nodes using Discv5 (Chained Connection)") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + enrNode2, err := node2.ENR() + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + enrNode3, err := node3.ENR() + require.NoError(t, err, "Failed to get ENR for Node3") + + node4Config := DefaultWakuConfig + node4Config.Relay = true + node4Config.Discv5BootstrapNodes = []string{enrNode3.String()} + + Debug("Creating Node4 with Node3 as Discv5 bootstrap") + node4, err := StartWakuNode("Node4", &node4Config) + require.NoError(t, err, "Failed to start Node4") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + node4.StopAndDestroy() + }() + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in mesh for Node1") + peerCount, err := node1.GetNumConnectedPeers() + require.NoError(t, err, "Failed to get number of peers in mesh for Node1") + + Debug("Total number of peers in mesh for Node1: %d", peerCount) + require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh") + + Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)") +} From 02781be64fcc78dd4785d43a1e8ed106c040339a Mon Sep 17 00:00:00 2001 From: aya Date: Mon, 17 Feb 2025 08:28:08 +0200 Subject: [PATCH 08/12] Adding more relay tests --- waku/relay_test.go | 172 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 166 insertions(+), 6 deletions(-) diff --git a/waku/relay_test.go b/waku/relay_test.go index 4f39d96..9257d41 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -10,7 +10,7 @@ import ( "google.golang.org/protobuf/proto" ) -func TestRelaySubscribeToDefaultTopic(t *testing.T) { +func TestNumRelayPeers(t *testing.T) { Debug("Starting test to verify relay subscription with multiple nodes via Discv5") // Configure Node1 @@ -345,7 +345,8 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) { node2, err := StartWakuNode("Node2", &node2Config) require.NoError(t, err, "Failed to start Node2") - //enrNode2, err := node2.ENR() + //enrNode2, err := node2.ENR() // commented till confirm from Gabriel + //if num peers shall be affected by it or not //require.NoError(t, err, "Failed to get ENR for Node2") node3Config := DefaultWakuConfig @@ -462,7 +463,7 @@ func TestDiscv5PeerMeshCount(t *testing.T) { require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore) - require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 3 peers in the mesh before stopping Node3") + require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3") Debug("Stopping Node3") node3.StopAndDestroy() @@ -475,7 +476,7 @@ func TestDiscv5PeerMeshCount(t *testing.T) { require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter) - require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 2 peers in the mesh after stopping Node3") + require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3") Debug("Test successfully verified peer count change after stopping Node3") } @@ -535,12 +536,171 @@ func TestDiscv5GetPeersConnected(t *testing.T) { err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4}) require.NoError(t, err, "Nodes did not auto-connect within timeout") - Debug("Fetching number of peers in mesh for Node1") + Debug("Fetching number of peers in connected to Node1") peerCount, err := node1.GetNumConnectedPeers() require.NoError(t, err, "Failed to get number of peers in mesh for Node1") - Debug("Total number of peers in mesh for Node1: %d", peerCount) + Debug("Total number of peers connected to Node1: %d", peerCount) require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh") Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)") } + +func TestRelaySubscribeFailsWhenRelayDisabled(t *testing.T) { + Debug("Starting test to verify that subscribing to a topic fails when Relay is disabled") + + nodeConfig := DefaultWakuConfig + nodeConfig.Relay = false + + Debug("Creating Node with Relay disabled") + node, err := StartWakuNode("TestNode", &nodeConfig) + require.NoError(t, err, "Failed to start Node") + + defer func() { + Debug("Stopping and destroying the Waku node") + node.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Attempting to subscribe to the default pubsub topic: %s", defaultPubsubTopic) + + err = node.RelaySubscribe(defaultPubsubTopic) + + Debug("Verifying that subscription failed") + require.Error(t, err, "Expected RelaySubscribe to return an error when Relay is disabled") + + Debug("Test successfully verified that RelaySubscribe fails when Relay is disabled") +} + +func TestRelayDisabledNodeDoesNotReceiveMessages(t *testing.T) { + Debug("Starting test to verify that a node with Relay disabled does not receive messages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + enrNode2, err := node2.ENR() + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Creating and publishing message from Node1") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting to ensure message delivery") + time.Sleep(3 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Verifying that Node3 did NOT receive the message") + err = node3.VerifyMessageReceived(message, msgHash) + require.Error(t, err, "Node3 should NOT have received the message") + + Debug("Test successfully verified that Node3 did not receive the message") +} + +func TestPublishWithLargePayload(t *testing.T) { + Debug("Starting test to verify message publishing with a payload close to 150KB") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + payloadLength := 1024 * 100 // 100KB raw, approximately 150KB when base64 encoded + Debug("Generating a large payload of %d bytes", payloadLength) + + largePayload := make([]byte, payloadLength) + for i := range largePayload { + largePayload[i] = 'a' + } + + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: largePayload, + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + Debug("Publishing message from Node1 with large payload") + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting to ensure message propagation") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Test successfully verified message publishing with a large payload") +} From e53eb9d5835131f880e45f8cb70ea7cdb931be57 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 19 Feb 2025 09:21:59 +0200 Subject: [PATCH 09/12] Fix review points --- waku/nodes_basic_test.go | 1 - waku/nwaku.go | 119 ++--------- waku/nwaku_test_utils.go | 98 +++++++++ waku/peer_connections_test.go | 150 +++++++++++++- waku/relay_test.go | 361 ++++++++++------------------------ waku/test_data.go | 12 +- 6 files changed, 363 insertions(+), 378 deletions(-) diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go index 92cbe08..43489c8 100644 --- a/waku/nodes_basic_test.go +++ b/waku/nodes_basic_test.go @@ -19,7 +19,6 @@ func TestBasicWakuNodes(t *testing.T) { // Use defer to ensure proper cleanup defer func() { - Debug("Stopping and destroying Node") node.StopAndDestroy() }() diff --git a/waku/nwaku.go b/waku/nwaku.go index c7d269e..26969ad 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -334,7 +334,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) const requestTimeout = 30 * time.Second @@ -683,6 +682,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { + wg := sync.WaitGroup{} if pubsubTopic == "" { err := errors.New("pubsub topic is empty") Error("Failed to subscribe to relay: %v", err) @@ -696,10 +696,6 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { } Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - - wg := sync.WaitGroup{} - wg.Add(1) - var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) @@ -707,10 +703,9 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { defer C.free(unsafe.Pointer(cPubsubTopic)) Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - - Debug("Waiting for response from cGoWakuRelaySubscribe on node %s", n.nodeName) - wg.Wait() // Ensures the function completes before proceeding + wg.Wait() if C.getRet(resp) == C.RET_OK { Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) @@ -953,7 +948,7 @@ func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage } // Handling context internally with a timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() Debug("Attempting to publish message via relay on node %s", n.nodeName) @@ -1416,19 +1411,24 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { var nodeCfg WakuConfig if customCfg == nil { nodeCfg = DefaultWakuConfig - } else { nodeCfg = *customCfg } - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) - if err != nil { - Error("Failed to allocate unique ports: %v", err) - tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports - } + if nodeCfg.TcpPort == 0 || nodeCfg.Discv5UdpPort == 0 { + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 + } - nodeCfg.TcpPort = tcpPort - nodeCfg.Discv5UdpPort = udpPort + if nodeCfg.TcpPort == 0 { + nodeCfg.TcpPort = tcpPort + } + if nodeCfg.Discv5UdpPort == 0 { + nodeCfg.Discv5UdpPort = udpPort + } + } Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) @@ -1448,6 +1448,7 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { } func (n *WakuNode) StopAndDestroy() error { + Debug("Stopping and destroying Node") if n == nil { err := errors.New("waku node is nil") Error("Failed to stop and destroy: %v", err) @@ -1550,91 +1551,11 @@ func ConnectAllPeers(nodes []*WakuNode) error { return nil } -func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { - timeout := 3 * time.Second - Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - select { - case envelope := <-n.MsgChan: - if envelope == nil { - Error("Received envelope is nil on node %s", n.nodeName) - return errors.New("received envelope is nil") - } - if string(expectedMessage.Payload) != string(envelope.Message().Payload) { - Error("Payload does not match on node %s", n.nodeName) - return errors.New("payload does not match") - } - if expectedMessage.ContentTopic != envelope.Message().ContentTopic { - Error("Content topic does not match on node %s", n.nodeName) - return errors.New("content topic does not match") - } - if expectedHash != envelope.Hash() { - Error("Message hash does not match on node %s", n.nodeName) - return errors.New("message hash does not match") - } - Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) - return nil - case <-ctx.Done(): - Error("Timeout: message not received within %v on node %s", timeout, n.nodeName) - return errors.New("timeout: message not received within the given duration") - } -} - -func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { - Debug("Creating a WakuMessage on node %s", n.nodeName) - - if len(customMessage) > 0 && customMessage[0] != nil { - Debug("Using provided custom message on node %s", n.nodeName) - return customMessage[0] - } - - Debug("Using default message format on node %s", n.nodeName) - defaultMessage := &pb.WakuMessage{ - Payload: []byte("This is a default Waku message payload"), - ContentTopic: "test-content-topic", - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().UnixNano()), - } - - Debug("Successfully created a default WakuMessage on node %s", n.nodeName) - return defaultMessage -} - -func WaitForAutoConnection(nodeList []*WakuNode) error { - Debug("Waiting for auto-connection of nodes...") - - var hardWait = 30 * time.Second - Debug("Applying hard wait of %v seconds before checking connections", hardWait.Seconds()) - time.Sleep(hardWait) - - for _, node := range nodeList { - peers, err := node.GetConnectedPeers() - if err != nil { - Error("Failed to get connected peers for node %s: %v", node.nodeName, err) - return err - } - - if len(peers) < 1 { - Error("Node %s has no connected peers, expected at least 1", node.nodeName) - return errors.New("expected at least one connected peer") - } - - Debug("Node %s has %d connected peers", node.nodeName, len(peers)) - } - - Debug("Auto-connection check completed successfully") - return nil -} - func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { for _, node := range nodes { Debug("Subscribing node %s to topic %s", node.nodeName, topic) - err := RetryWithBackOff(func() error { - return node.RelaySubscribe(topic) - }) + err := node.RelaySubscribe(topic) + if err != nil { Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) return err diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index bb923fe..f3f3aa9 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -1,7 +1,9 @@ package waku import ( + "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -10,6 +12,9 @@ import ( "time" "github.com/cenkalti/backoff/v3" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) type NwakuInfo struct { @@ -77,3 +82,96 @@ func RetryWithBackOff(o func() error, options ...BackOffOption) error { b.Reset() return backoff.Retry(o, &b) } + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +} + +func WaitForAutoConnection(nodeList []*WakuNode) error { + Debug("Waiting for auto-connection of nodes...") + + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + err := RetryWithBackOff(func() error { + for _, node := range nodeList { + peers, err := node.GetConnectedPeers() + if err != nil { + return err + } + + if len(peers) < 1 { + return errors.New("expected at least one connected peer") // Retry + } + + Debug("Node %s has %d connected peers", node.nodeName, len(peers)) + } + + return nil + }, options) + + if err != nil { + Error("Auto-connection failed after retries: %v", err) + return err + } + + Debug("Auto-connection check completed successfully") + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash, timeout ...time.Duration) error { + + var verifyTimeout time.Duration + if len(timeout) > 0 { + verifyTimeout = timeout[0] + } else { + verifyTimeout = DefaultTimeOut + } + + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, verifyTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), verifyTimeout) + defer cancel() + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", verifyTimeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") + } +} diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index 5576a16..4971dc1 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -31,8 +31,6 @@ func TestDisconnectPeerNodes(t *testing.T) { require.NoError(t, err, "Failed to get PeerID for Node B") require.True(t, slices.Contains(connectedPeers, nodeBPeerID), "Node B should be a peer of Node A before disconnection") - time.Sleep(3 * time.Second) - Debug("Disconnecting Node A from Node B") err = nodeA.DisconnectPeer(nodeB) require.NoError(t, err, "Failed to disconnect nodes") @@ -78,16 +76,152 @@ func TestConnectMultipleNodesToSingleNode(t *testing.T) { err = node3.ConnectPeer(node1) require.NoError(t, err, "Failed to connect Node 3 to Node 1") - Debug("Verifying connected peers for Node 3") - connectedPeers, err := node3.GetConnectedPeers() - require.NoError(t, err, "Failed to get connected peers for Node 3") - node1PeerID, err := node1.PeerID() + Debug("Verifying connected peers for Node 1") + connectedPeers, err := node1.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node 1") + node3PeerID, err := node3.PeerID() require.NoError(t, err, "Failed to get PeerID for Node 1") node2PeerID, err := node2.PeerID() require.NoError(t, err, "Failed to get PeerID for Node 2") - require.True(t, slices.Contains(connectedPeers, node1PeerID), "Node 1 should be a peer of Node 3") - require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 3") + require.True(t, slices.Contains(connectedPeers, node3PeerID), "Node 3 should be a peer of Node 1") + require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 1") Debug("Test completed successfully: multiple nodes connected to a single node and verified peers") } + +func TestDiscv5PeerMeshCount(t *testing.T) { + Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node2Config.Relay = true + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node3Config.Relay = true + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe all nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in mesh for Node1 before stopping Node3") + peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") + + Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore) + require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3") + + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network update after Node3 stops") + time.Sleep(10 * time.Second) + + Debug("Fetching number of peers in mesh for Node1 after stopping Node3") + peerCountAfter, err := node1.GetNumPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") + + Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter) + require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3") + + Debug("Test successfully verified peer count change after stopping Node3") +} + +// this test commented as it will fail will be changed to have external ip in future task +/* +func TestDiscv5GetPeersConnected(t *testing.T) { + Debug("Starting test to verify peer count in mesh with 4 nodes using Discv5 (Chained Connection)") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + enrNode2, err := node2.ENR() + require.NoError(t, err, "Failed to get ENR for Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + enrNode3, err := node3.ENR() + require.NoError(t, err, "Failed to get ENR for Node3") + + node4Config := DefaultWakuConfig + node4Config.Relay = true + node4Config.Discv5BootstrapNodes = []string{enrNode3.String()} + + Debug("Creating Node4 with Node3 as Discv5 bootstrap") + node4, err := StartWakuNode("Node4", &node4Config) + require.NoError(t, err, "Failed to start Node4") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + node4.StopAndDestroy() + }() + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in connected to Node1") + peerCount, err := node1.GetNumConnectedPeers() + require.NoError(t, err, "Failed to get number of peers in mesh for Node1") + + Debug("Total number of peers connected to Node1: %d", peerCount) + require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh") + + Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)") +} +*/ \ No newline at end of file diff --git a/waku/relay_test.go b/waku/relay_test.go index 9257d41..7912736 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -1,95 +1,69 @@ package waku import ( + "fmt" "testing" "time" + "github.com/cenkalti/backoff/v3" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) -func TestNumRelayPeers(t *testing.T) { - Debug("Starting test to verify relay subscription with multiple nodes via Discv5") +func TestVerifyNumConnectedRelayPeers(t *testing.T) { - // Configure Node1 - node1Config := DefaultWakuConfig - node1Config.Relay = true + node1Cfg := DefaultWakuConfig + node1Cfg.Relay = true + node1, err := StartWakuNode("node1", &node1Cfg) + if err != nil { + t.Fatalf("Failed to start node1: %v", err) + } + node2Cfg := DefaultWakuConfig + node2Cfg.Relay = true + node2, err := StartWakuNode("node2", &node2Cfg) + if err != nil { + t.Fatalf("Failed to start node2: %v", err) + } - Debug("Creating Node1 with Relay enabled") - node1, err := StartWakuNode("Node1", &node1Config) - require.NoError(t, err, "Failed to start Node1") - - // Retrieve Node1's ENR for Discv5 bootstrapping - enrNode1, err := node1.ENR() - require.NoError(t, err, "Failed to get ENR for Node1") - - // Configure Node2 with Node1 as its Discv5 bootstrap node - node2Config := DefaultWakuConfig - node2Config.Relay = true - node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} - - Debug("Creating Node2 with Node1 as Discv5 bootstrap") - node2, err := StartWakuNode("Node2", &node2Config) - require.NoError(t, err, "Failed to start Node2") - - // Configure Node3 with Node1 as its Discv5 bootstrap node - node3Config := DefaultWakuConfig - node3Config.Relay = true - node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} - - Debug("Creating Node3 with Node1 as Discv5 bootstrap") - node3, err := StartWakuNode("Node3", &node3Config) - require.NoError(t, err, "Failed to start Node3") + node3, err := StartWakuNode("node3", nil) + if err != nil { + t.Fatalf("Failed to start node3: %v", err) + } defer func() { - Debug("Stopping and destroying all Waku nodes") node1.StopAndDestroy() node2.StopAndDestroy() node3.StopAndDestroy() }() - defaultPubsubTopic := DefaultPubsubTopic - Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) - - Debug("Waiting for nodes to auto-connect via Discv5") - err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) - require.NoError(t, err, "Nodes did not auto-connect within timeout") - - Debug("Subscribing Node1 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node1.RelaySubscribe(defaultPubsubTopic) - }) - require.NoError(t, err) - - Debug("Subscribing Node2 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node2.RelaySubscribe(defaultPubsubTopic) - }) - require.NoError(t, err) - - Debug("Subscribing Node3 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node3.RelaySubscribe(defaultPubsubTopic) - }) - require.NoError(t, err) - - Debug("Waiting for peer connections to stabilize") - time.Sleep(10 * time.Second) - - Debug("Fetching number of connected peers for Node1") - peers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) - require.NoError(t, err, "Failed to get number of connected relay peers for Node1") - - Debug("Total number of connected relay peers for Node1: %d", peers) - - if peers != 2 { - Error("Expected Node1 to have exactly 2 relay peers, but got %d", peers) - t.FailNow() + err = node2.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node2 to node1: %v", err) } - Debug("Test successfully verified that Node1 has 2 relay peers after Node2 and Node3 subscribed") + err = node3.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node3 to node1: %v", err) + } + connectedPeersNode1, err := node1.GetConnectedPeers() + if err != nil { + t.Fatalf("Failed to get connected peers for node1: %v", err) + } + if len(connectedPeersNode1) != 2 { + t.Fatalf("Expected 2 connected peers on node1, but got %d", len(connectedPeersNode1)) + } + + numRelayPeers, err := node1.GetNumConnectedRelayPeers() + if err != nil { + t.Fatalf("Failed to get connected relay peers for node1: %v", err) + } + if numRelayPeers != 1 { + t.Fatalf("Expected 1 relay peer on node1, but got %d", numRelayPeers) + } + + t.Logf("Successfully connected node2 and node3 to node1. Relay Peers: %d, Total Peers: %d", numRelayPeers, len(connectedPeersNode1)) } func TestRelayMessageTransmission(t *testing.T) { @@ -122,9 +96,7 @@ func TestRelayMessageTransmission(t *testing.T) { Debug("Subscribing ReceiverNode to the default pubsub topic") defaultPubsubTopic := DefaultPubsubTopic - err = RetryWithBackOff(func() error { - return receiverNode.RelaySubscribe(defaultPubsubTopic) - }) + err = receiverNode.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) Debug("Creating and publishing message") @@ -142,9 +114,6 @@ func TestRelayMessageTransmission(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, msgHash) - Debug("Waiting to ensure message delivery") - time.Sleep(2 * time.Second) - Debug("Verifying message reception") err = RetryWithBackOff(func() error { msgHashObj, _ := common.ToMessageHash(msgHash) @@ -261,7 +230,7 @@ func TestSendmsgInvalidPayload(t *testing.T) { } func TestRelayNodesNotConnectedDirectly(t *testing.T) { - Debug("Starting TestMessageNotReceivedWithoutRelay") + Debug("Starting TestRelayNodesNotConnectedDirectly") Debug("Creating Sender Node with Relay enabled") senderConfig := DefaultWakuConfig @@ -273,23 +242,25 @@ func TestRelayNodesNotConnectedDirectly(t *testing.T) { Debug("Creating Relay-Enabled Receiver Node (Node2)") node2Config := DefaultWakuConfig node2Config.Relay = true - enrNode, err := senderNode.ENR() - if err != nil { - require.Error(t, err, "Can't find node ENR") - } - node2Config.Discv5BootstrapNodes = []string{enrNode.String()} + + // Use static nodes instead of ENR + node1Address, err := senderNode.ListenAddresses() + require.NoError(t, err, "Failed to get sender node address") + node2Config.Staticnodes = []string{node1Address[0].String()} + node2, err := StartWakuNode("Node2", &node2Config) require.NoError(t, err) defer node2.StopAndDestroy() - Debug("Creating Non-Relay Receiver Node (Node3)") + Debug("Creating Relay-Enabled Receiver Node (Node3)") node3Config := DefaultWakuConfig node3Config.Relay = true - enrNode2, err := node2.ENR() - if err != nil { - require.Error(t, err, "Can't find node ENR") - } - node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} + + // Use static nodes instead of Discv5 + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get node2 address") + node3Config.Staticnodes = []string{node2Address[0].String()} + node3, err := StartWakuNode("Node3", &node3Config) require.NoError(t, err) defer node3.StopAndDestroy() @@ -299,13 +270,13 @@ func TestRelayNodesNotConnectedDirectly(t *testing.T) { err = node2.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) - Debug("Subscribing Node2 to the default pubsub topic") + Debug("Subscribing Node3 to the default pubsub topic") err = node3.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) - Debug("Waiting for nodes to auto-connect before proceeding") + Debug("Waiting for nodes to connect before proceeding") err = WaitForAutoConnection([]*WakuNode{senderNode, node2, node3}) - require.NoError(t, err, "Nodes did not auto-connect within timeout") + require.NoError(t, err, "Nodes did not connect within timeout") Debug("SenderNode is publishing a message") message := senderNode.CreateMessage() @@ -317,11 +288,11 @@ func TestRelayNodesNotConnectedDirectly(t *testing.T) { err = node2.VerifyMessageReceived(message, msgHash) require.NoError(t, err, "Node2 should have received the message") - Debug("Verifying that Node3 receive the message") + Debug("Verifying that Node3 received the message") err = node3.VerifyMessageReceived(message, msgHash) - require.NoError(t, err, "Node3 should have received the message") + require.NoError(t, err, "Node3 should have received the message") - Debug("TestMessageNotReceivedWithoutRelay completed successfully") + Debug("TestRelayNodesNotConnectedDirectly completed successfully") } func TestRelaySubscribeAndPeerCountChange(t *testing.T) { @@ -334,26 +305,25 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) { node1, err := StartWakuNode("Node1", &node1Config) require.NoError(t, err, "Failed to start Node1") - enrNode1, err := node1.ENR() - require.NoError(t, err, "Failed to get ENR for Node1") + node1Address, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node1") node2Config := DefaultWakuConfig node2Config.Relay = true - node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node2Config.Staticnodes = []string{node1Address[0].String()} - Debug("Creating Node2 with Node1 as Discv5 bootstrap") + Debug("Creating Node2 with Node1 as a static node") node2, err := StartWakuNode("Node2", &node2Config) require.NoError(t, err, "Failed to start Node2") - //enrNode2, err := node2.ENR() // commented till confirm from Gabriel - //if num peers shall be affected by it or not - //require.NoError(t, err, "Failed to get ENR for Node2") + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node2") node3Config := DefaultWakuConfig node3Config.Relay = true - node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node3Config.Staticnodes = []string{node2Address[0].String()} - Debug("Creating Node3 with Node2 as Discv5 bootstrap") + Debug("Creating Node3 with Node2 as a static node") node3, err := StartWakuNode("Node3", &node3Config) require.NoError(t, err, "Failed to start Node3") @@ -366,186 +336,55 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) { defaultPubsubTopic := DefaultPubsubTopic Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) - Debug("Waiting for nodes to auto-connect via Discv5") + Debug("Waiting for nodes to connect via static node configuration") err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) - require.NoError(t, err, "Nodes did not auto-connect within timeout") + require.NoError(t, err, "Nodes did not connect within timeout") Debug("Subscribing Node1 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node1.RelaySubscribe(defaultPubsubTopic) - }) + err = node1.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) Debug("Subscribing Node2 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node2.RelaySubscribe(defaultPubsubTopic) - }) + err = node2.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) Debug("Subscribing Node3 to the default pubsub topic: %s", defaultPubsubTopic) - err = RetryWithBackOff(func() error { - return node3.RelaySubscribe(defaultPubsubTopic) - }) + err = node3.RelaySubscribe(defaultPubsubTopic) require.NoError(t, err) Debug("Waiting for peer connections to stabilize") - time.Sleep(10 * time.Second) + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 10 * time.Second // Only set the max wait time + } + require.NoError(t, RetryWithBackOff(func() error { + numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + if err != nil { + return err + } + if numPeers != 2 { + return fmt.Errorf("expected 2 relay peers, got %d", numPeers) + } + return nil + }, options), "Peers did not stabilize in time") - Debug("Fetching number of connected peers for Node1") - peersBeforeStopping, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) - require.NoError(t, err, "Failed to get number of connected relay peers for Node1") - - Debug("Total number of connected relay peers for Node1 before stopping Node3: %d", peersBeforeStopping) - require.Equal(t, 2, peersBeforeStopping, "Expected Node1 to have exactly 2 relay peers before stopping Node3") Debug("Stopping Node3") node3.StopAndDestroy() Debug("Waiting for network to update after Node3 stops") - time.Sleep(10 * time.Second) + require.NoError(t, RetryWithBackOff(func() error { + numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) + if err != nil { + return err + } + if numPeers != 1 { + return fmt.Errorf("expected 1 relay peer after stopping Node3, got %d", numPeers) + } + return nil + }, options), "Peer count did not update after stopping Node3") - Debug("Fetching number of connected peers for Node1 after stopping Node3") - peersAfterStopping, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic) - require.NoError(t, err, "Failed to get number of connected relay peers for Node1 after stopping Node3") - - Debug("Total number of connected relay peers for Node1 after stopping Node3: %d", peersAfterStopping) - - require.Equal(t, 1, peersAfterStopping, "Expected Node1 to have exactly 1 relay peer after stopping Node3") Debug("Test successfully verified peer count changes as expected after stopping Node3") } -func TestDiscv5PeerMeshCount(t *testing.T) { - Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription") - - node1Config := DefaultWakuConfig - node1Config.Relay = true - Debug("Creating Node1") - node1, err := StartWakuNode("Node1", &node1Config) - require.NoError(t, err, "Failed to start Node1") - - enrNode1, err := node1.ENR() - require.NoError(t, err, "Failed to get ENR for Node1") - - node2Config := DefaultWakuConfig - node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} - node2Config.Relay = true - Debug("Creating Node2 with Node1 as Discv5 bootstrap") - node2, err := StartWakuNode("Node2", &node2Config) - require.NoError(t, err, "Failed to start Node2") - - require.NoError(t, err, "Failed to get ENR for Node2") - - node3Config := DefaultWakuConfig - node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} - node3Config.Relay = true - - Debug("Creating Node3 with Node2 as Discv5 bootstrap") - node3, err := StartWakuNode("Node3", &node3Config) - require.NoError(t, err, "Failed to start Node3") - - defer func() { - Debug("Stopping and destroying all Waku nodes") - node1.StopAndDestroy() - node2.StopAndDestroy() - }() - - defaultPubsubTopic := DefaultPubsubTopic - Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) - - err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic) - require.NoError(t, err, "Failed to subscribe all nodes to the topic") - - Debug("Waiting for nodes to auto-connect via Discv5") - err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) - require.NoError(t, err, "Nodes did not auto-connect within timeout") - - Debug("Fetching number of peers in mesh for Node1 before stopping Node3") - peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic) - require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") - - Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", peerCountBefore) - require.Equal(t, 2, peerCountBefore, "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3") - - Debug("Stopping Node3") - node3.StopAndDestroy() - - Debug("Waiting for network update after Node3 stops") - time.Sleep(10 * time.Second) - - Debug("Fetching number of peers in mesh for Node1 after stopping Node3") - peerCountAfter, err := node1.GetNumPeersInMesh(defaultPubsubTopic) - require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") - - Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", peerCountAfter) - require.Equal(t, 1, peerCountAfter, "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3") - - Debug("Test successfully verified peer count change after stopping Node3") -} - -func TestDiscv5GetPeersConnected(t *testing.T) { - Debug("Starting test to verify peer count in mesh with 4 nodes using Discv5 (Chained Connection)") - - node1Config := DefaultWakuConfig - node1Config.Relay = true - - Debug("Creating Node1") - node1, err := StartWakuNode("Node1", &node1Config) - require.NoError(t, err, "Failed to start Node1") - - enrNode1, err := node1.ENR() - require.NoError(t, err, "Failed to get ENR for Node1") - - node2Config := DefaultWakuConfig - node2Config.Relay = true - node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} - - Debug("Creating Node2 with Node1 as Discv5 bootstrap") - node2, err := StartWakuNode("Node2", &node2Config) - require.NoError(t, err, "Failed to start Node2") - - enrNode2, err := node2.ENR() - require.NoError(t, err, "Failed to get ENR for Node2") - - node3Config := DefaultWakuConfig - node3Config.Relay = true - node3Config.Discv5BootstrapNodes = []string{enrNode2.String()} - - Debug("Creating Node3 with Node2 as Discv5 bootstrap") - node3, err := StartWakuNode("Node3", &node3Config) - require.NoError(t, err, "Failed to start Node3") - - enrNode3, err := node3.ENR() - require.NoError(t, err, "Failed to get ENR for Node3") - - node4Config := DefaultWakuConfig - node4Config.Relay = true - node4Config.Discv5BootstrapNodes = []string{enrNode3.String()} - - Debug("Creating Node4 with Node3 as Discv5 bootstrap") - node4, err := StartWakuNode("Node4", &node4Config) - require.NoError(t, err, "Failed to start Node4") - - defer func() { - Debug("Stopping and destroying all Waku nodes") - node1.StopAndDestroy() - node2.StopAndDestroy() - node3.StopAndDestroy() - node4.StopAndDestroy() - }() - - Debug("Waiting for nodes to auto-connect via Discv5") - err = WaitForAutoConnection([]*WakuNode{node1, node2, node3, node4}) - require.NoError(t, err, "Nodes did not auto-connect within timeout") - - Debug("Fetching number of peers in connected to Node1") - peerCount, err := node1.GetNumConnectedPeers() - require.NoError(t, err, "Failed to get number of peers in mesh for Node1") - - Debug("Total number of peers connected to Node1: %d", peerCount) - require.Equal(t, 3, peerCount, "Expected Node1 to have exactly 3 peers in the mesh") - - Debug("Test successfully verified peer count in mesh with 4 nodes using Discv5 (Chained Connection)") -} - func TestRelaySubscribeFailsWhenRelayDisabled(t *testing.T) { Debug("Starting test to verify that subscribing to a topic fails when Relay is disabled") diff --git a/waku/test_data.go b/waku/test_data.go index bdb139c..dd08f68 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -8,13 +8,6 @@ var DefaultWakuConfig WakuConfig func init() { - udpPort, _, err1 := GetFreePortIfNeeded(0, 0) - tcpPort, _, err2 := GetFreePortIfNeeded(0, 0) - - if err1 != nil || err2 != nil { - Error("Failed to get free ports %v %v", err1, err2) - } - DefaultWakuConfig = WakuConfig{ Relay: false, LogLevel: "DEBUG", @@ -25,12 +18,13 @@ func init() { Store: false, Filter: false, Lightpush: false, - Discv5UdpPort: udpPort, - TcpPort: tcpPort, + Discv5UdpPort: 0, + TcpPort: 0, } } const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node +const DefaultTimeOut = 3 * time.Second var DefaultPubsubTopic = "/waku/2/rs/16/64" var ( From 2f789647a25e30907ae94c0de65db8fc2c158e47 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 19 Feb 2025 11:40:37 +0200 Subject: [PATCH 10/12] fix review points --- waku/relay_test.go | 40 ++-------------------------------------- 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/waku/relay_test.go b/waku/relay_test.go index 7912736..0315029 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -94,18 +94,13 @@ func TestRelayMessageTransmission(t *testing.T) { err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) require.NoError(t, err, "Nodes did not auto-connect within timeout") - Debug("Subscribing ReceiverNode to the default pubsub topic") - defaultPubsubTopic := DefaultPubsubTopic - err = receiverNode.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - Debug("Creating and publishing message") message := senderNode.CreateMessage() var msgHash string err = RetryWithBackOff(func() error { var err error - msgHashObj, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + msgHashObj, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message) if err == nil { msgHash = msgHashObj.String() } @@ -147,12 +142,6 @@ func TestRelayMessageBroadcast(t *testing.T) { nodes[i] = node } - Debug("Subscribing nodes to the default pubsub topic") - for _, node := range nodes { - err := node.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - } - WaitForAutoConnection(nodes) senderNode := nodes[0] @@ -198,10 +187,6 @@ func TestSendmsgInvalidPayload(t *testing.T) { require.NoError(t, err) defer receiverNode.StopAndDestroy() - Debug("Subscribing SenderNode to the default pubsub topic") - err = senderNode.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - err = WaitForAutoConnection([]*WakuNode{senderNode, receiverNode}) require.NoError(t, err, "Nodes did not auto-connect within timeout") @@ -265,22 +250,13 @@ func TestRelayNodesNotConnectedDirectly(t *testing.T) { require.NoError(t, err) defer node3.StopAndDestroy() - Debug("Subscribing Node2 to the default pubsub topic") - defaultPubsubTopic := DefaultPubsubTopic - err = node2.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - - Debug("Subscribing Node3 to the default pubsub topic") - err = node3.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - Debug("Waiting for nodes to connect before proceeding") err = WaitForAutoConnection([]*WakuNode{senderNode, node2, node3}) require.NoError(t, err, "Nodes did not connect within timeout") Debug("SenderNode is publishing a message") message := senderNode.CreateMessage() - msgHash, err := senderNode.RelayPublishNoCTX(defaultPubsubTopic, message) + msgHash, err := senderNode.RelayPublishNoCTX(DefaultPubsubTopic, message) require.NoError(t, err) require.NotEmpty(t, msgHash) @@ -340,18 +316,6 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) { err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) require.NoError(t, err, "Nodes did not connect within timeout") - Debug("Subscribing Node1 to the default pubsub topic: %s", defaultPubsubTopic) - err = node1.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - - Debug("Subscribing Node2 to the default pubsub topic: %s", defaultPubsubTopic) - err = node2.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - - Debug("Subscribing Node3 to the default pubsub topic: %s", defaultPubsubTopic) - err = node3.RelaySubscribe(defaultPubsubTopic) - require.NoError(t, err) - Debug("Waiting for peer connections to stabilize") options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 10 * time.Second // Only set the max wait time From d438c21f79de9267e7aa9aca73c16cbbe47a27f5 Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Wed, 26 Feb 2025 12:21:30 +0200 Subject: [PATCH 11/12] Update waku/nwaku.go Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> --- waku/nwaku.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 26969ad..dab2d1e 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -1415,20 +1415,18 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { nodeCfg = *customCfg } - if nodeCfg.TcpPort == 0 || nodeCfg.Discv5UdpPort == 0 { - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) - if err != nil { - Error("Failed to allocate unique ports: %v", err) - tcpPort, udpPort = 0, 0 - } - - if nodeCfg.TcpPort == 0 { - nodeCfg.TcpPort = tcpPort - } - if nodeCfg.Discv5UdpPort == 0 { - nodeCfg.Discv5UdpPort = udpPort - } - } + tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 + } + + if nodeCfg.TcpPort == 0 { + nodeCfg.TcpPort = tcpPort + } + if nodeCfg.Discv5UdpPort == 0 { + nodeCfg.Discv5UdpPort = udpPort + } Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) From 21eae98e8d822c7eb03e696fa94cceaefb9a9629 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 26 Feb 2025 12:52:30 +0200 Subject: [PATCH 12/12] work on merge conflicts --- waku/nwaku.go | 308 +++++++++++---------------------------- waku/nwaku_test_utils.go | 40 +++++ 2 files changed, 129 insertions(+), 219 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index dab2d1e..03133da 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -56,15 +56,6 @@ package waku // resp must be set != NULL in case interest on retrieving data from the callback void GoCallback(int ret, char* msg, size_t len, void* resp); - #define WAKU_CALL(call) \ - do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ - } while (0) - static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); @@ -72,27 +63,27 @@ package waku } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_start(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_version(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -118,21 +109,21 @@ package waku char* encoding, void* resp) { - WAKU_CALL( waku_content_topic(wakuCtx, + waku_content_topic(wakuCtx, appName, appVersion, contentTopicName, encoding, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); + waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -141,44 +132,44 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_relay_publish(wakuCtx, + waku_relay_publish(wakuCtx, pubSubTopic, jsonWakuMessage, timeoutMs, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - WAKU_CALL ( waku_relay_subscribe(wakuCtx, + waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { - WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx, + waku_relay_add_protected_shard(wakuCtx, clusterId, shardId, publicKey, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, + waku_relay_unsubscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { - WAKU_CALL( waku_connect(wakuCtx, + waku_connect(wakuCtx, peerMultiAddr, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDialPeer(void* wakuCtx, @@ -187,12 +178,12 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL( waku_dial_peer(wakuCtx, + waku_dial_peer(wakuCtx, peerMultiAddr, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDialPeerById(void* wakuCtx, @@ -201,51 +192,51 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL( waku_dial_peer_by_id(wakuCtx, + waku_dial_peer_by_id(wakuCtx, peerId, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { - WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, + waku_disconnect_peer_by_id(wakuCtx, peerId, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); + waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); + waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); + waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); + waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); + waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -253,11 +244,11 @@ package waku const char* jsonWakuMessage, void* resp) { - WAKU_CALL (waku_lightpush_publish(wakuCtx, + waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuStoreQuery(void* wakuCtx, @@ -266,32 +257,32 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_store_query(wakuCtx, - jsonQuery, - peerAddr, - timeoutMs, - (WakuCallBack) GoCallback, - resp)); + waku_store_query(wakuCtx, + jsonQuery, + peerAddr, + timeoutMs, + (WakuCallBack) GoCallback, + resp); } static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { - WAKU_CALL (waku_peer_exchange_request(wakuCtx, + waku_peer_exchange_request(wakuCtx, numPeers, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { - WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, + waku_get_peerids_by_protocol(wakuCtx, protocol, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuDnsDiscovery(void* wakuCtx, @@ -300,12 +291,12 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_dns_discovery(wakuCtx, - entTreeUrl, - nameDnsServer, - timeoutMs, - (WakuCallBack) GoCallback, - resp)); + waku_dns_discovery(wakuCtx, + entTreeUrl, + nameDnsServer, + timeoutMs, + (WakuCallBack) GoCallback, + resp); } */ @@ -325,7 +316,6 @@ import ( "unsafe" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" @@ -333,89 +323,12 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" - "go.uber.org/zap" ) const requestTimeout = 30 * time.Second -const MsgChanBufferSize = 100 -const TopicHealthChanBufferSize = 100 -const ConnectionChangeChanBufferSize = 100 - -type WakuConfig struct { - Host string `json:"host,omitempty"` - Nodekey string `json:"nodekey,omitempty"` - Relay bool `json:"relay"` - Store bool `json:"store,omitempty"` - LegacyStore bool `json:"legacyStore"` - Storenode string `json:"storenode,omitempty"` - StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` - StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` - StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"` - StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"` - StoreResume bool `json:"storeResume,omitempty"` - Filter bool `json:"filter,omitempty"` - Filternode string `json:"filternode,omitempty"` - FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"` - FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"` - FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"` - Lightpush bool `json:"lightpush,omitempty"` - LightpushNode string `json:"lightpushnode,omitempty"` - LogLevel string `json:"logLevel,omitempty"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` - Staticnodes []string `json:"staticnodes,omitempty"` - Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - Discv5UdpPort int `json:"discv5UdpPort,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` - PeerExchange bool `json:"peerExchange,omitempty"` - PeerExchangeNode string `json:"peerExchangeNode,omitempty"` - TcpPort int `json:"tcpPort,omitempty"` - RateLimits RateLimitsConfig `json:"rateLimits,omitempty"` -} - -type RateLimitsConfig struct { - Filter *RateLimit `json:"-"` - Lightpush *RateLimit `json:"-"` - PeerExchange *RateLimit `json:"-"` -} - -func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) { - output := []string{} - if rlc.Filter != nil { - output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String())) - } - if rlc.Lightpush != nil { - output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String())) - } - if rlc.PeerExchange != nil { - output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String())) - } - return json.Marshal(output) -} - -type RateLimitTimeUnit string - -const Hour RateLimitTimeUnit = "h" -const Minute RateLimitTimeUnit = "m" -const Second RateLimitTimeUnit = "s" -const Millisecond RateLimitTimeUnit = "ms" - -type RateLimit struct { - Volume int // Number of allowed messages per period - Period int // Length of each rate-limit period (in TimeUnit) - TimeUnit RateLimitTimeUnit // Time unit of the period -} - -func (rl RateLimit) String() string { - return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit) -} - -func (rl RateLimit) MarshalJSON() ([]byte, error) { - return json.Marshal(rl.String()) -} +const MsgChanBufferSize = 1024 +const TopicHealthChanBufferSize = 1024 +const ConnectionChangeChanBufferSize = 1024 //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -432,14 +345,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer - config *WakuConfig + config *common.WakuConfig MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string } -func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { +func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ config: config, @@ -512,11 +425,12 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un node.OnEvent(eventStr) } } else { - errMsgField := zap.Skip() if len != 0 { - errMsgField = zap.String("error", C.GoStringN(msg, C.int(len))) + errMsg := C.GoStringN(msg, C.int(len)) + Error("globalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg) + } else { + Error("globalEventCallback retCode not ok, retCode: %v", callerRet) } - log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField) } } @@ -656,7 +570,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - Debug("No connected peers found for %s", n.nodeName) + Debug("No connected peers found for %v", n.nodeName) return nil, nil } @@ -665,44 +579,39 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { for _, peerID := range peerIDs { id, err := peer.Decode(peerID) if err != nil { - Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) return nil, err } peers = append(peers, id) } - Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) + Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers)) return peers, nil } errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg) return nil, errors.New(errMsg) } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - wg := sync.WaitGroup{} if pubsubTopic == "" { - err := errors.New("pubsub topic is empty") - Error("Failed to subscribe to relay: %v", err) - return err + return errors.New("pubsub topic is empty") } - if n.wakuCtx == nil { - err := errors.New("wakuCtx is nil") - Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) - return err - } + wg := sync.WaitGroup{} - Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() @@ -811,6 +720,7 @@ func (n *WakuNode) StartDiscV5() error { Debug("Starting DiscV5 for node: %s", n.nodeName) wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -1092,7 +1002,7 @@ func (n *WakuNode) Destroy() error { } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to destroy %v: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -1336,7 +1246,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } numPeers := len(peers) - Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) + Debug("Successfully fetched number of connected peers for %v, count: %v", n.nodeName, numPeers) return numPeers, nil } @@ -1358,7 +1268,7 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { for i := 0; i < 10; i++ { tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0")) if err != nil { - Warn("unable to resolve tcp addr: %v", zap.Error(err)) + Warn("unable to resolve tcp addr: %v", err) continue } tcpListener, err := net.ListenTCP("tcp", tcpAddr) @@ -1404,29 +1314,29 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { } // Create & start node -func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { +func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) { Debug("Initializing %s", nodeName) - var nodeCfg WakuConfig + var nodeCfg common.WakuConfig if customCfg == nil { nodeCfg = DefaultWakuConfig } else { nodeCfg = *customCfg } - tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) - if err != nil { - Error("Failed to allocate unique ports: %v", err) - tcpPort, udpPort = 0, 0 - } - - if nodeCfg.TcpPort == 0 { - nodeCfg.TcpPort = tcpPort - } - if nodeCfg.Discv5UdpPort == 0 { - nodeCfg.Discv5UdpPort = udpPort - } + tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 + } + + if nodeCfg.TcpPort == 0 { + nodeCfg.TcpPort = tcpPort + } + if nodeCfg.Discv5UdpPort == 0 { + nodeCfg.Discv5UdpPort = udpPort + } Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) @@ -1522,43 +1432,3 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } - -func ConnectAllPeers(nodes []*WakuNode) error { - if len(nodes) == 0 { - Error("Cannot connect peers: node list is empty") - return errors.New("node list is empty") - } - - timeout := time.Duration(len(nodes)*2) * time.Second - Debug("Connecting nodes in a relay chain with timeout: %v", timeout) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - for i := 0; i < len(nodes)-1; i++ { - Debug("Connecting node %d to node %d", i, i+1) - err := nodes[i].ConnectPeer(nodes[i+1]) - if err != nil { - Error("Failed to connect node %d to node %d: %v", i, i+1, err) - return err - } - } - - <-ctx.Done() - Debug("Connections stabilized") - return nil -} - -func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { - for _, node := range nodes { - Debug("Subscribing node %s to topic %s", node.nodeName, topic) - err := node.RelaySubscribe(topic) - - if err != nil { - Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) - return err - } - Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) - } - return nil -} diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index f3f3aa9..5b1bc96 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -175,3 +175,43 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect return errors.New("timeout: message not received within the given duration") } } + +func ConnectAllPeers(nodes []*WakuNode) error { + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + <-ctx.Done() + Debug("Connections stabilized") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := node.RelaySubscribe(topic) + + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +}