Adding changes to APIs in nwaku.go

This commit is contained in:
aya 2025-02-09 14:21:38 +02:00
parent 1878db0e12
commit 0940993ce6
2 changed files with 183 additions and 36 deletions

View File

@ -437,6 +437,7 @@ type WakuNode struct {
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
ConnectionChangeChan chan connectionChange
nodeName string
}
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
@ -581,15 +582,15 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) {
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {
pubsubTopic = ""
} else {
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching number of connected relay peers for " + n.nodeName)
pubsubTopic := ""
if len(optPubsubTopic) > 0 {
pubsubTopic = optPubsubTopic[0]
}
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -604,13 +605,17 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
logger.Error("Failed to convert relay peer count for "+n.nodeName, zap.Error(err))
return 0, err
}
logger.Debug("Successfully fetched number of connected relay peers for "+n.nodeName, zap.Int("count", numPeers))
return numPeers, nil
}
errMsg := "error GetNumConnectedRelayPeers: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to get number of connected relay peers for "+n.nodeName, zap.String("error", errMsg))
return 0, errors.New(errMsg)
}
@ -634,8 +639,17 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
}
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
wg := sync.WaitGroup{}
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to get connected peers", zap.Error(err))
return nil, err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching connected peers for " + n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -646,23 +660,29 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
logger.Debug("No connected peers found for " + n.nodeName)
return nil, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
peerIDs := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
for _, peerID := range peerIDs {
id, err := peer.Decode(peerID)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err)
logger.Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err))
return nil, err
}
peers = append(peers, id)
}
logger.Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers)))
return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg)
errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
@ -982,37 +1002,66 @@ func (n *WakuNode) Start() error {
}
func (n *WakuNode) Stop() error {
wg := sync.WaitGroup{}
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to stop", zap.Error(err))
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Stopping " + n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStop(n.wakuCtx, resp)
C.cGoWakuStop(n.wakuCtx, resp) // Calls the C function to stop the Waku node
wg.Wait()
if C.getRet(resp) == C.RET_OK {
unregisterNode(n)
unregisterNode(n) // Ensure the node is properly unregistered
logger.Debug("Successfully stopped " + n.nodeName)
return nil
}
// Extract error message from C response
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to stop "+n.nodeName, zap.String("error", errMsg))
return errors.New(errMsg)
}
func (n *WakuNode) Destroy() error {
wg := sync.WaitGroup{}
if n == nil {
logger := GetLogger()
err := errors.New("Waku node is nil")
logger.Error("Failed to destroy", zap.Error(err))
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Destroying " + n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuDestroy(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
logger.Debug("Successfully destroyed " + n.nodeName)
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg))
return errors.New(errMsg)
}
@ -1240,11 +1289,26 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p
}
func (n *WakuNode) GetNumConnectedPeers() (int, error) {
peers, err := n.GetConnectedPeers()
if err != nil {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to get number of connected peers", zap.Error(err))
return 0, err
}
return len(peers), nil
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching number of connected peers for " + n.nodeName)
peers, err := n.GetConnectedPeers()
if err != nil {
logger.Error("Failed to fetch connected peers for "+n.nodeName, zap.Error(err))
return 0, err
}
numPeers := len(peers)
logger.Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers))
return numPeers, nil
}
func getContextTimeoutMilliseconds(ctx context.Context) int {
@ -1307,36 +1371,119 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in
return tcpPort, discV5UDPPort, nil
}
func StartWakuNode(customCfg *WakuConfig) (*WakuNode, error) {
// Create & start node
func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
logrusLogger := GetLogger()
logger := logrusToZap(logrusLogger)
logger := logrusToZap(logrusLogger, nodeName) // Convert logrus to zap with node name
logger.Debug("Initializing Waku node")
logger.Debug("Initializing " + nodeName)
var nodeCfg WakuConfig
if customCfg == nil {
nodeCfg = DefaultWakuConfig // Use the default configuration
nodeCfg = DefaultWakuConfig
} else {
nodeCfg = *customCfg
}
// Assign dynamically generated ports
nodeCfg.Discv5UdpPort = GenerateUniquePort()
nodeCfg.TcpPort = GenerateUniquePort()
logger.Debug("Creating Waku node")
logger.Debug("Creating " + nodeName)
node, err := NewWakuNode(&nodeCfg, logger)
if err != nil {
logger.Error("Failed to create Waku node", zap.Error(err))
logger.Error("Failed to create "+nodeName, zap.Error(err))
return nil, err
}
logger.Debug("Starting Waku node")
node.nodeName = nodeName
logger.Debug("Starting " + nodeName)
if err := node.Start(); err != nil {
logger.Error("Failed to start Waku node", zap.Error(err))
logger.Error("Failed to start "+nodeName, zap.Error(err))
return nil, err
}
logger.Debug("Successfully started Waku node")
logger.Debug("Successfully started " + nodeName)
return node, nil
}
func (n *WakuNode) StopAndDestroy() error {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to stop and destroy", zap.Error(err))
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Stopping " + n.nodeName)
err := n.Stop()
if err != nil {
logger.Error("Failed to stop "+n.nodeName, zap.Error(err))
return err
}
logger.Debug("Destroying " + n.nodeName)
err = n.Destroy()
if err != nil {
logger.Error("Failed to destroy "+n.nodeName, zap.Error(err))
return err
}
logger.Debug("Successfully stopped and destroyed " + n.nodeName)
return nil
}
func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error {
logger := n.logger.Named(n.nodeName)
logger.Debug("Connecting " + n.nodeName + " to " + targetNode.nodeName)
targetPeerID, err := targetNode.PeerID()
if err != nil {
logger.Error("Failed to get PeerID of target node "+targetNode.nodeName, zap.Error(err))
return err
}
targetAddr, err := targetNode.ListenAddresses()
if err != nil || len(targetAddr) == 0 {
logger.Error("Failed to get listen addresses for target node "+targetNode.nodeName, zap.Error(err))
return errors.New("target node has no listen addresses")
}
logger.Debug("Attempting connection to peer " + targetPeerID.String())
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = n.Connect(ctx, targetAddr[0])
if err != nil {
logger.Error("Failed to connect to peer "+targetPeerID.String(), zap.Error(err))
return err
}
logger.Debug("Successfully connected " + n.nodeName + " to " + targetNode.nodeName)
return nil
}
func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
logger := n.logger.Named(n.nodeName)
logger.Debug("Disconnecting " + n.nodeName + " from " + target.nodeName)
targetPeerID, err := target.PeerID()
if err != nil {
logger.Error("Failed to get PeerID of target node "+target.nodeName, zap.Error(err))
return err
}
err = n.DisconnectPeerByID(targetPeerID)
if err != nil {
logger.Error("Failed to disconnect peer "+targetPeerID.String(), zap.Error(err))
return err
}
logger.Debug("Successfully disconnected " + n.nodeName + " from " + target.nodeName)
return nil
}

View File

@ -21,13 +21,13 @@ type NwakuInfo struct {
EnrUri string `json:"enrUri"`
}
func logrusToZap(log *logrus.Logger) *zap.Logger {
func logrusToZap(log *logrus.Logger, nodeName string) *zap.Logger {
config := zap.NewDevelopmentConfig()
config.EncoderConfig.TimeKey = "" // Remove timestamp duplication
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
zapLogger, _ := config.Build()
return zapLogger
return zapLogger.Named(nodeName)
}
func GenerateUniquePort() int {