From 21eae98e8d822c7eb03e696fa94cceaefb9a9629 Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 26 Feb 2025 12:52:30 +0200 Subject: [PATCH] work on merge conflicts --- waku/nwaku.go | 308 +++++++++++---------------------------- waku/nwaku_test_utils.go | 40 +++++ 2 files changed, 129 insertions(+), 219 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index dab2d1e..03133da 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -56,15 +56,6 @@ package waku // resp must be set != NULL in case interest on retrieving data from the callback void GoCallback(int ret, char* msg, size_t len, void* resp); - #define WAKU_CALL(call) \ - do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ - } while (0) - static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); @@ -72,27 +63,27 @@ package waku } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_start(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_version(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -118,21 +109,21 @@ package waku char* encoding, void* resp) { - WAKU_CALL( waku_content_topic(wakuCtx, + waku_content_topic(wakuCtx, appName, appVersion, contentTopicName, encoding, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); + waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); + waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -141,44 +132,44 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_relay_publish(wakuCtx, + waku_relay_publish(wakuCtx, pubSubTopic, jsonWakuMessage, timeoutMs, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - WAKU_CALL ( waku_relay_subscribe(wakuCtx, + waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { - WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx, + waku_relay_add_protected_shard(wakuCtx, clusterId, shardId, publicKey, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, + waku_relay_unsubscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { - WAKU_CALL( waku_connect(wakuCtx, + waku_connect(wakuCtx, peerMultiAddr, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDialPeer(void* wakuCtx, @@ -187,12 +178,12 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL( waku_dial_peer(wakuCtx, + waku_dial_peer(wakuCtx, peerMultiAddr, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDialPeerById(void* wakuCtx, @@ -201,51 +192,51 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL( waku_dial_peer_by_id(wakuCtx, + waku_dial_peer_by_id(wakuCtx, peerId, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { - WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, + waku_disconnect_peer_by_id(wakuCtx, peerId, (WakuCallBack) GoCallback, - resp) ); + resp); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); + waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); + waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); + waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); + waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); + waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); + waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -253,11 +244,11 @@ package waku const char* jsonWakuMessage, void* resp) { - WAKU_CALL (waku_lightpush_publish(wakuCtx, + waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuStoreQuery(void* wakuCtx, @@ -266,32 +257,32 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_store_query(wakuCtx, - jsonQuery, - peerAddr, - timeoutMs, - (WakuCallBack) GoCallback, - resp)); + waku_store_query(wakuCtx, + jsonQuery, + peerAddr, + timeoutMs, + (WakuCallBack) GoCallback, + resp); } static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { - WAKU_CALL (waku_peer_exchange_request(wakuCtx, + waku_peer_exchange_request(wakuCtx, numPeers, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { - WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, + waku_get_peerids_by_protocol(wakuCtx, protocol, (WakuCallBack) GoCallback, - resp)); + resp); } static void cGoWakuDnsDiscovery(void* wakuCtx, @@ -300,12 +291,12 @@ package waku int timeoutMs, void* resp) { - WAKU_CALL (waku_dns_discovery(wakuCtx, - entTreeUrl, - nameDnsServer, - timeoutMs, - (WakuCallBack) GoCallback, - resp)); + waku_dns_discovery(wakuCtx, + entTreeUrl, + nameDnsServer, + timeoutMs, + (WakuCallBack) GoCallback, + resp); } */ @@ -325,7 +316,6 @@ import ( "unsafe" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" @@ -333,89 +323,12 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" - "go.uber.org/zap" ) const requestTimeout = 30 * time.Second -const MsgChanBufferSize = 100 -const TopicHealthChanBufferSize = 100 -const ConnectionChangeChanBufferSize = 100 - -type WakuConfig struct { - Host string `json:"host,omitempty"` - Nodekey string `json:"nodekey,omitempty"` - Relay bool `json:"relay"` - Store bool `json:"store,omitempty"` - LegacyStore bool `json:"legacyStore"` - Storenode string `json:"storenode,omitempty"` - StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` - StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` - StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"` - StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"` - StoreResume bool `json:"storeResume,omitempty"` - Filter bool `json:"filter,omitempty"` - Filternode string `json:"filternode,omitempty"` - FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"` - FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"` - FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"` - Lightpush bool `json:"lightpush,omitempty"` - LightpushNode string `json:"lightpushnode,omitempty"` - LogLevel string `json:"logLevel,omitempty"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` - Staticnodes []string `json:"staticnodes,omitempty"` - Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - Discv5UdpPort int `json:"discv5UdpPort,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` - PeerExchange bool `json:"peerExchange,omitempty"` - PeerExchangeNode string `json:"peerExchangeNode,omitempty"` - TcpPort int `json:"tcpPort,omitempty"` - RateLimits RateLimitsConfig `json:"rateLimits,omitempty"` -} - -type RateLimitsConfig struct { - Filter *RateLimit `json:"-"` - Lightpush *RateLimit `json:"-"` - PeerExchange *RateLimit `json:"-"` -} - -func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) { - output := []string{} - if rlc.Filter != nil { - output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String())) - } - if rlc.Lightpush != nil { - output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String())) - } - if rlc.PeerExchange != nil { - output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String())) - } - return json.Marshal(output) -} - -type RateLimitTimeUnit string - -const Hour RateLimitTimeUnit = "h" -const Minute RateLimitTimeUnit = "m" -const Second RateLimitTimeUnit = "s" -const Millisecond RateLimitTimeUnit = "ms" - -type RateLimit struct { - Volume int // Number of allowed messages per period - Period int // Length of each rate-limit period (in TimeUnit) - TimeUnit RateLimitTimeUnit // Time unit of the period -} - -func (rl RateLimit) String() string { - return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit) -} - -func (rl RateLimit) MarshalJSON() ([]byte, error) { - return json.Marshal(rl.String()) -} +const MsgChanBufferSize = 1024 +const TopicHealthChanBufferSize = 1024 +const ConnectionChangeChanBufferSize = 1024 //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -432,14 +345,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer - config *WakuConfig + config *common.WakuConfig MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string } -func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { +func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ config: config, @@ -512,11 +425,12 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un node.OnEvent(eventStr) } } else { - errMsgField := zap.Skip() if len != 0 { - errMsgField = zap.String("error", C.GoStringN(msg, C.int(len))) + errMsg := C.GoStringN(msg, C.int(len)) + Error("globalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg) + } else { + Error("globalEventCallback retCode not ok, retCode: %v", callerRet) } - log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField) } } @@ -656,7 +570,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - Debug("No connected peers found for %s", n.nodeName) + Debug("No connected peers found for %v", n.nodeName) return nil, nil } @@ -665,44 +579,39 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { for _, peerID := range peerIDs { id, err := peer.Decode(peerID) if err != nil { - Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) return nil, err } peers = append(peers, id) } - Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) + Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers)) return peers, nil } errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg) return nil, errors.New(errMsg) } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - wg := sync.WaitGroup{} if pubsubTopic == "" { - err := errors.New("pubsub topic is empty") - Error("Failed to subscribe to relay: %v", err) - return err + return errors.New("pubsub topic is empty") } - if n.wakuCtx == nil { - err := errors.New("wakuCtx is nil") - Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) - return err - } + wg := sync.WaitGroup{} - Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() @@ -811,6 +720,7 @@ func (n *WakuNode) StartDiscV5() error { Debug("Starting DiscV5 for node: %s", n.nodeName) wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -1092,7 +1002,7 @@ func (n *WakuNode) Destroy() error { } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to destroy %v: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -1336,7 +1246,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } numPeers := len(peers) - Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) + Debug("Successfully fetched number of connected peers for %v, count: %v", n.nodeName, numPeers) return numPeers, nil } @@ -1358,7 +1268,7 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { for i := 0; i < 10; i++ { tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0")) if err != nil { - Warn("unable to resolve tcp addr: %v", zap.Error(err)) + Warn("unable to resolve tcp addr: %v", err) continue } tcpListener, err := net.ListenTCP("tcp", tcpAddr) @@ -1404,29 +1314,29 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { } // Create & start node -func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { +func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) { Debug("Initializing %s", nodeName) - var nodeCfg WakuConfig + var nodeCfg common.WakuConfig if customCfg == nil { nodeCfg = DefaultWakuConfig } else { nodeCfg = *customCfg } - tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) - if err != nil { - Error("Failed to allocate unique ports: %v", err) - tcpPort, udpPort = 0, 0 - } - - if nodeCfg.TcpPort == 0 { - nodeCfg.TcpPort = tcpPort - } - if nodeCfg.Discv5UdpPort == 0 { - nodeCfg.Discv5UdpPort = udpPort - } + tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 + } + + if nodeCfg.TcpPort == 0 { + nodeCfg.TcpPort = tcpPort + } + if nodeCfg.Discv5UdpPort == 0 { + nodeCfg.Discv5UdpPort = udpPort + } Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) @@ -1522,43 +1432,3 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } - -func ConnectAllPeers(nodes []*WakuNode) error { - if len(nodes) == 0 { - Error("Cannot connect peers: node list is empty") - return errors.New("node list is empty") - } - - timeout := time.Duration(len(nodes)*2) * time.Second - Debug("Connecting nodes in a relay chain with timeout: %v", timeout) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - for i := 0; i < len(nodes)-1; i++ { - Debug("Connecting node %d to node %d", i, i+1) - err := nodes[i].ConnectPeer(nodes[i+1]) - if err != nil { - Error("Failed to connect node %d to node %d: %v", i, i+1, err) - return err - } - } - - <-ctx.Done() - Debug("Connections stabilized") - return nil -} - -func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { - for _, node := range nodes { - Debug("Subscribing node %s to topic %s", node.nodeName, topic) - err := node.RelaySubscribe(topic) - - if err != nil { - Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) - return err - } - Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) - } - return nil -} diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index f3f3aa9..5b1bc96 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -175,3 +175,43 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect return errors.New("timeout: message not received within the given duration") } } + +func ConnectAllPeers(nodes []*WakuNode) error { + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + <-ctx.Done() + Debug("Connections stabilized") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := node.RelaySubscribe(topic) + + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +}