From 0940993ce6842b0e86e170e2e1f0ed449b732b8d Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 9 Feb 2025 14:21:38 +0200 Subject: [PATCH] Adding changes to APIs in nwaku.go --- waku/nwaku.go | 215 ++++++++++++++++++++++++++++++++------- waku/nwaku_test_utils.go | 4 +- 2 files changed, 183 insertions(+), 36 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index c9fda3e..4a4ef88 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -437,6 +437,7 @@ type WakuNode struct { MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange + nodeName string } func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { @@ -581,15 +582,15 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { } func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { - var pubsubTopic string - if len(optPubsubTopic) == 0 { - pubsubTopic = "" - } else { + logger := n.logger.Named(n.nodeName) + logger.Debug("Fetching number of connected relay peers for " + n.nodeName) + + pubsubTopic := "" + if len(optPubsubTopic) > 0 { pubsubTopic = optPubsubTopic[0] } wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -604,13 +605,17 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error() - return 0, errors.New(errMsg) + logger.Error("Failed to convert relay peer count for "+n.nodeName, zap.Error(err)) + return 0, err } + + logger.Debug("Successfully fetched number of connected relay peers for "+n.nodeName, zap.Int("count", numPeers)) return numPeers, nil } - errMsg := "error GetNumConnectedRelayPeers: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + + errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + logger.Error("Failed to get number of connected relay peers for "+n.nodeName, zap.String("error", errMsg)) + return 0, errors.New(errMsg) } @@ -634,8 +639,17 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { } func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - wg := sync.WaitGroup{} + if n == nil { + logger := GetLogger() + err := errors.New("waku node is nil") + logger.Error("Failed to get connected peers", zap.Error(err)) + return nil, err + } + logger := n.logger.Named(n.nodeName) + logger.Debug("Fetching connected peers for " + n.nodeName) + + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -646,23 +660,29 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { + logger.Debug("No connected peers found for " + n.nodeName) return nil, nil } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") + + peerIDs := strings.Split(peersStr, ",") var peers peer.IDSlice - for _, peerId := range itemsPeerIds { - id, err := peer.Decode(peerId) + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) if err != nil { - return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err) + logger.Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) + return nil, err } peers = append(peers, id) } + + logger.Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) return peers, nil } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) + errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + logger.Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) + + return nil, errors.New(errMsg) } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { @@ -982,37 +1002,66 @@ func (n *WakuNode) Start() error { } func (n *WakuNode) Stop() error { - wg := sync.WaitGroup{} + if n == nil { + logger := GetLogger() + err := errors.New("waku node is nil") + logger.Error("Failed to stop", zap.Error(err)) + return err + } + logger := n.logger.Named(n.nodeName) + + logger.Debug("Stopping " + n.nodeName) + + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) - C.cGoWakuStop(n.wakuCtx, resp) + C.cGoWakuStop(n.wakuCtx, resp) // Calls the C function to stop the Waku node wg.Wait() + if C.getRet(resp) == C.RET_OK { - unregisterNode(n) + unregisterNode(n) // Ensure the node is properly unregistered + logger.Debug("Successfully stopped " + n.nodeName) return nil } + // Extract error message from C response errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + logger.Error("Failed to stop "+n.nodeName, zap.String("error", errMsg)) + return errors.New(errMsg) } func (n *WakuNode) Destroy() error { - wg := sync.WaitGroup{} + if n == nil { + logger := GetLogger() + err := errors.New("Waku node is nil") + logger.Error("Failed to destroy", zap.Error(err)) + return err + } + logger := n.logger.Named(n.nodeName) + + logger.Debug("Destroying " + n.nodeName) + + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) wg.Wait() + if C.getRet(resp) == C.RET_OK { + logger.Debug("Successfully destroyed " + n.nodeName) return nil } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + logger.Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) + return errors.New(errMsg) } @@ -1240,11 +1289,26 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p } func (n *WakuNode) GetNumConnectedPeers() (int, error) { - peers, err := n.GetConnectedPeers() - if err != nil { + if n == nil { + logger := GetLogger() + err := errors.New("waku node is nil") + logger.Error("Failed to get number of connected peers", zap.Error(err)) return 0, err } - return len(peers), nil + + logger := n.logger.Named(n.nodeName) + logger.Debug("Fetching number of connected peers for " + n.nodeName) + + peers, err := n.GetConnectedPeers() + if err != nil { + logger.Error("Failed to fetch connected peers for "+n.nodeName, zap.Error(err)) + return 0, err + } + + numPeers := len(peers) + logger.Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) + + return numPeers, nil } func getContextTimeoutMilliseconds(ctx context.Context) int { @@ -1307,36 +1371,119 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in return tcpPort, discV5UDPPort, nil } -func StartWakuNode(customCfg *WakuConfig) (*WakuNode, error) { +// Create & start node +func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { logrusLogger := GetLogger() - logger := logrusToZap(logrusLogger) + logger := logrusToZap(logrusLogger, nodeName) // Convert logrus to zap with node name - logger.Debug("Initializing Waku node") + logger.Debug("Initializing " + nodeName) var nodeCfg WakuConfig if customCfg == nil { - nodeCfg = DefaultWakuConfig // Use the default configuration + nodeCfg = DefaultWakuConfig } else { nodeCfg = *customCfg } - // Assign dynamically generated ports nodeCfg.Discv5UdpPort = GenerateUniquePort() nodeCfg.TcpPort = GenerateUniquePort() - logger.Debug("Creating Waku node") + logger.Debug("Creating " + nodeName) node, err := NewWakuNode(&nodeCfg, logger) if err != nil { - logger.Error("Failed to create Waku node", zap.Error(err)) + logger.Error("Failed to create "+nodeName, zap.Error(err)) return nil, err } - logger.Debug("Starting Waku node") + node.nodeName = nodeName + + logger.Debug("Starting " + nodeName) if err := node.Start(); err != nil { - logger.Error("Failed to start Waku node", zap.Error(err)) + logger.Error("Failed to start "+nodeName, zap.Error(err)) return nil, err } - logger.Debug("Successfully started Waku node") + logger.Debug("Successfully started " + nodeName) return node, nil } + +func (n *WakuNode) StopAndDestroy() error { + if n == nil { + logger := GetLogger() + err := errors.New("waku node is nil") + logger.Error("Failed to stop and destroy", zap.Error(err)) + return err + } + + logger := n.logger.Named(n.nodeName) + + logger.Debug("Stopping " + n.nodeName) + + err := n.Stop() + if err != nil { + logger.Error("Failed to stop "+n.nodeName, zap.Error(err)) + return err + } + + logger.Debug("Destroying " + n.nodeName) + + err = n.Destroy() + if err != nil { + logger.Error("Failed to destroy "+n.nodeName, zap.Error(err)) + return err + } + + logger.Debug("Successfully stopped and destroyed " + n.nodeName) + return nil +} + +func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error { + logger := n.logger.Named(n.nodeName) + logger.Debug("Connecting " + n.nodeName + " to " + targetNode.nodeName) + + targetPeerID, err := targetNode.PeerID() + if err != nil { + logger.Error("Failed to get PeerID of target node "+targetNode.nodeName, zap.Error(err)) + return err + } + + targetAddr, err := targetNode.ListenAddresses() + if err != nil || len(targetAddr) == 0 { + logger.Error("Failed to get listen addresses for target node "+targetNode.nodeName, zap.Error(err)) + return errors.New("target node has no listen addresses") + } + + logger.Debug("Attempting connection to peer " + targetPeerID.String()) + + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + + err = n.Connect(ctx, targetAddr[0]) + if err != nil { + logger.Error("Failed to connect to peer "+targetPeerID.String(), zap.Error(err)) + return err + } + + logger.Debug("Successfully connected " + n.nodeName + " to " + targetNode.nodeName) + return nil +} + +func (n *WakuNode) DisconnectPeer(target *WakuNode) error { + logger := n.logger.Named(n.nodeName) + logger.Debug("Disconnecting " + n.nodeName + " from " + target.nodeName) + + targetPeerID, err := target.PeerID() + if err != nil { + logger.Error("Failed to get PeerID of target node "+target.nodeName, zap.Error(err)) + return err + } + + err = n.DisconnectPeerByID(targetPeerID) + if err != nil { + logger.Error("Failed to disconnect peer "+targetPeerID.String(), zap.Error(err)) + return err + } + + logger.Debug("Successfully disconnected " + n.nodeName + " from " + target.nodeName) + return nil +} diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index f721548..48a22bc 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -21,13 +21,13 @@ type NwakuInfo struct { EnrUri string `json:"enrUri"` } -func logrusToZap(log *logrus.Logger) *zap.Logger { +func logrusToZap(log *logrus.Logger, nodeName string) *zap.Logger { config := zap.NewDevelopmentConfig() config.EncoderConfig.TimeKey = "" // Remove timestamp duplication config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) zapLogger, _ := config.Build() - return zapLogger + return zapLogger.Named(nodeName) } func GenerateUniquePort() int {