diff --git a/waku/logging.go b/waku/logging.go index c9c6fdc..6d610aa 100644 --- a/waku/logging.go +++ b/waku/logging.go @@ -11,8 +11,8 @@ var ( instance *logrus.Logger ) -// GetLogger ensures we always return the same logger instance -func GetLogger() *logrus.Logger { +// _getLogger ensures we always return the same logger instance (private function) +func _getLogger() *logrus.Logger { once.Do(func() { instance = logrus.New() instance.SetFormatter(&logrus.TextFormatter{ @@ -25,15 +25,19 @@ func GetLogger() *logrus.Logger { // Debug logs a debug message func Debug(msg string, args ...interface{}) { - GetLogger().WithFields(logrus.Fields{}).Debugf(msg, args...) + _getLogger().WithFields(logrus.Fields{}).Debugf(msg, args...) } // Info logs an info message func Info(msg string, args ...interface{}) { - GetLogger().WithFields(logrus.Fields{}).Infof(msg, args...) + _getLogger().WithFields(logrus.Fields{}).Infof(msg, args...) } // Error logs an error message func Error(msg string, args ...interface{}) { - GetLogger().WithFields(logrus.Fields{}).Errorf(msg, args...) + _getLogger().WithFields(logrus.Fields{}).Errorf(msg, args...) +} + +func Warn(msg string, args ...interface{}) { + _getLogger().WithFields(logrus.Fields{}).Warnf(msg, args...) } diff --git a/waku/nwaku.go b/waku/nwaku.go index 4a4ef88..e3dc669 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -433,18 +433,17 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { type WakuNode struct { wakuCtx unsafe.Pointer config *WakuConfig - logger *zap.Logger MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string } -func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { - +func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { + Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ - config: config, - logger: logger, + config: config, + nodeName: nodeName, } wg := sync.WaitGroup{} @@ -455,14 +454,14 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { } var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonConfig)) defer C.freeResp(resp) if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("error wakuNew for %s: %v", nodeName, errMsg) return nil, errors.New(errMsg) } @@ -474,10 +473,10 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize) - // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) registerNode(n) + Debug("Successfully created WakuNode: %s", nodeName) return n, nil } @@ -539,7 +538,8 @@ func (n *WakuNode) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { - n.logger.Error("could not unmarshal nwaku event string", zap.Error(err)) + Error("could not unmarshal nwaku event string: %v", err) + return } @@ -556,7 +556,7 @@ func (n *WakuNode) OnEvent(eventStr string) { func (n *WakuNode) parseMessageEvent(eventStr string) { envelope, err := common.NewEnvelope(eventStr) if err != nil { - n.logger.Error("could not parse message", zap.Error(err)) + Error("could not parse message %v", err) } n.MsgChan <- envelope } @@ -566,7 +566,7 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { topicHealth := topicHealth{} err := json.Unmarshal([]byte(eventStr), &topicHealth) if err != nil { - n.logger.Error("could not parse topic health change", zap.Error(err)) + Error("could not parse topic health change %v", err) } n.TopicHealthChan <- topicHealth } @@ -576,15 +576,14 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { connectionChange := connectionChange{} err := json.Unmarshal([]byte(eventStr), &connectionChange) if err != nil { - n.logger.Error("could not parse connection change", zap.Error(err)) + Error("could not parse connection change %v", err) } n.ConnectionChangeChan <- connectionChange } func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { - logger := n.logger.Named(n.nodeName) - logger.Debug("Fetching number of connected relay peers for " + n.nodeName) + Debug("Fetching number of connected relay peers for %s", n.nodeName) pubsubTopic := "" if len(optPubsubTopic) > 0 { pubsubTopic = optPubsubTopic[0] @@ -605,16 +604,15 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - logger.Error("Failed to convert relay peer count for "+n.nodeName, zap.Error(err)) + Error("Failed to convert relay peer count for %s: %v", n.nodeName, err) return 0, err } - - logger.Debug("Successfully fetched number of connected relay peers for "+n.nodeName, zap.Int("count", numPeers)) + Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers) return numPeers, nil } errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - logger.Error("Failed to get number of connected relay peers for "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg) return 0, errors.New(errMsg) } @@ -640,14 +638,12 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if n == nil { - logger := GetLogger() err := errors.New("waku node is nil") - logger.Error("Failed to get connected peers", zap.Error(err)) + Error("Failed to get connected peers %v", err) return nil, err } - logger := n.logger.Named(n.nodeName) - logger.Debug("Fetching connected peers for " + n.nodeName) + Debug("Fetching connected peers for %v", n.nodeName) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -660,7 +656,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - logger.Debug("No connected peers found for " + n.nodeName) + Debug("No connected peers found for " + n.nodeName) return nil, nil } @@ -669,18 +665,18 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { for _, peerID := range peerIDs { id, err := peer.Decode(peerID) if err != nil { - logger.Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) + Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) return nil, err } peers = append(peers, id) } - logger.Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) + Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) return peers, nil } errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - logger.Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) return nil, errors.New(errMsg) } @@ -985,66 +981,57 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D } func (n *WakuNode) Start() error { - wg := sync.WaitGroup{} + Debug("Starting %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) wg.Wait() + if C.getRet(resp) == C.RET_OK { + Debug("Successfully started %s", n.nodeName) return nil } errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start %s: %s", n.nodeName, errMsg) + return errors.New(errMsg) } func (n *WakuNode) Stop() error { - if n == nil { - logger := GetLogger() - err := errors.New("waku node is nil") - logger.Error("Failed to stop", zap.Error(err)) - return err - } - - logger := n.logger.Named(n.nodeName) - - logger.Debug("Stopping " + n.nodeName) + Debug("Stopping %s", n.nodeName) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) - C.cGoWakuStop(n.wakuCtx, resp) // Calls the C function to stop the Waku node + C.cGoWakuStop(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { - unregisterNode(n) // Ensure the node is properly unregistered - logger.Debug("Successfully stopped " + n.nodeName) + Debug("Successfully stopped %s", n.nodeName) return nil } - // Extract error message from C response errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - logger.Error("Failed to stop "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to stop %s: %s", n.nodeName, errMsg) return errors.New(errMsg) } func (n *WakuNode) Destroy() error { if n == nil { - logger := GetLogger() - err := errors.New("Waku node is nil") - logger.Error("Failed to destroy", zap.Error(err)) + err := errors.New("waku node is nil") + Error("Failed to destroy %v", err) return err } - logger := n.logger.Named(n.nodeName) - - logger.Debug("Destroying " + n.nodeName) + Debug("Destroying %v", n.nodeName) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -1055,12 +1042,12 @@ func (n *WakuNode) Destroy() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - logger.Debug("Successfully destroyed " + n.nodeName) + Debug("Successfully destroyed " + n.nodeName) return nil } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - logger.Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) + Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) return errors.New(errMsg) } @@ -1290,23 +1277,21 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p func (n *WakuNode) GetNumConnectedPeers() (int, error) { if n == nil { - logger := GetLogger() err := errors.New("waku node is nil") - logger.Error("Failed to get number of connected peers", zap.Error(err)) + Error("Failed to get number of connected peers %v", err) return 0, err } - logger := n.logger.Named(n.nodeName) - logger.Debug("Fetching number of connected peers for " + n.nodeName) + Debug("Fetching number of connected peers for %v", n.nodeName) peers, err := n.GetConnectedPeers() if err != nil { - logger.Error("Failed to fetch connected peers for "+n.nodeName, zap.Error(err)) + Error("Failed to fetch connected peers for %v %v ", n.nodeName, err) return 0, err } numPeers := len(peers) - logger.Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) + Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) return numPeers, nil } @@ -1323,17 +1308,18 @@ func FormatWakuRelayTopic(clusterId uint16, shard uint16) string { return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard) } -func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (int, int, error) { +func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { if tcpPort == 0 { for i := 0; i < 10; i++ { tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0")) if err != nil { - logger.Warn("unable to resolve tcp addr: %v", zap.Error(err)) + Warn("unable to resolve tcp addr: %v", zap.Error(err)) continue } tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - logger.Warn("unable to listen on addr", zap.Stringer("addr", tcpAddr), zap.Error(err)) + Warn("unable to listen on addr: addr=%v, error=%v", tcpAddr, err) + continue } tcpPort = tcpListener.Addr().(*net.TCPAddr).Port @@ -1349,13 +1335,14 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in for i := 0; i < 10; i++ { udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("localhost", "0")) if err != nil { - logger.Warn("unable to resolve udp addr: %v", zap.Error(err)) + Warn("unable to resolve udp addr: %v", err) continue } udpListener, err := net.ListenUDP("udp", udpAddr) if err != nil { - logger.Warn("unable to listen on addr", zap.Stringer("addr", udpAddr), zap.Error(err)) + Warn("unable to listen on addr: addr=%v, error=%v", udpAddr, err) + continue } @@ -1373,10 +1360,8 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in // Create & start node func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { - logrusLogger := GetLogger() - logger := logrusToZap(logrusLogger, nodeName) // Convert logrus to zap with node name - logger.Debug("Initializing " + nodeName) + Debug("Initializing %s", nodeName) var nodeCfg WakuConfig if customCfg == nil { @@ -1385,105 +1370,96 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { nodeCfg = *customCfg } - nodeCfg.Discv5UdpPort = GenerateUniquePort() - nodeCfg.TcpPort = GenerateUniquePort() - - logger.Debug("Creating " + nodeName) - node, err := NewWakuNode(&nodeCfg, logger) + Debug("Creating %s", nodeName) + node, err := NewWakuNode(&nodeCfg, nodeName) if err != nil { - logger.Error("Failed to create "+nodeName, zap.Error(err)) + Error("Failed to create %s: %v", nodeName, err) return nil, err } - node.nodeName = nodeName - - logger.Debug("Starting " + nodeName) + Debug("Starting %s", nodeName) if err := node.Start(); err != nil { - logger.Error("Failed to start "+nodeName, zap.Error(err)) + Error("Failed to start %s: %v", nodeName, err) return nil, err } - logger.Debug("Successfully started " + nodeName) + Debug("Successfully started %s", nodeName) return node, nil } func (n *WakuNode) StopAndDestroy() error { if n == nil { - logger := GetLogger() err := errors.New("waku node is nil") - logger.Error("Failed to stop and destroy", zap.Error(err)) + Error("Failed to stop and destroy: %v", err) return err } - logger := n.logger.Named(n.nodeName) - - logger.Debug("Stopping " + n.nodeName) + Debug("Stopping %s", n.nodeName) err := n.Stop() if err != nil { - logger.Error("Failed to stop "+n.nodeName, zap.Error(err)) + Error("Failed to stop %s: %v", n.nodeName, err) return err } - logger.Debug("Destroying " + n.nodeName) + Debug("Destroying %s", n.nodeName) err = n.Destroy() if err != nil { - logger.Error("Failed to destroy "+n.nodeName, zap.Error(err)) + Error("Failed to destroy %s: %v", n.nodeName, err) return err } - logger.Debug("Successfully stopped and destroyed " + n.nodeName) + Debug("Successfully stopped and destroyed %s", n.nodeName) return nil } func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error { - logger := n.logger.Named(n.nodeName) - logger.Debug("Connecting " + n.nodeName + " to " + targetNode.nodeName) + + Debug("Connecting %s to %s", n.nodeName, targetNode.nodeName) targetPeerID, err := targetNode.PeerID() if err != nil { - logger.Error("Failed to get PeerID of target node "+targetNode.nodeName, zap.Error(err)) + Error("Failed to get PeerID of target node %s: %v", targetNode.nodeName, err) return err } targetAddr, err := targetNode.ListenAddresses() if err != nil || len(targetAddr) == 0 { - logger.Error("Failed to get listen addresses for target node "+targetNode.nodeName, zap.Error(err)) + Error("Failed to get listen addresses for target node %s: %v", targetNode.nodeName, err) return errors.New("target node has no listen addresses") } - logger.Debug("Attempting connection to peer " + targetPeerID.String()) + Debug("Attempting connection to peer %s", targetPeerID.String()) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() err = n.Connect(ctx, targetAddr[0]) if err != nil { - logger.Error("Failed to connect to peer "+targetPeerID.String(), zap.Error(err)) + Error("Failed to connect to peer %s: %v", targetPeerID.String(), err) return err } - logger.Debug("Successfully connected " + n.nodeName + " to " + targetNode.nodeName) + Debug("Successfully connected %s to %s", n.nodeName, targetNode.nodeName) return nil } func (n *WakuNode) DisconnectPeer(target *WakuNode) error { - logger := n.logger.Named(n.nodeName) - logger.Debug("Disconnecting " + n.nodeName + " from " + target.nodeName) + Debug("Disconnecting %s from %s", n.nodeName, target.nodeName) targetPeerID, err := target.PeerID() if err != nil { - logger.Error("Failed to get PeerID of target node "+target.nodeName, zap.Error(err)) + Error("Failed to get PeerID of target node %s: %v", target.nodeName, err) return err } err = n.DisconnectPeerByID(targetPeerID) if err != nil { - logger.Error("Failed to disconnect peer "+targetPeerID.String(), zap.Error(err)) + Error("Failed to disconnect peer %s: %v", targetPeerID.String(), err) return err } - logger.Debug("Successfully disconnected " + n.nodeName + " from " + target.nodeName) + Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index ee38b32..96450a7 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -37,9 +37,6 @@ func TestBasicWaku(t *testing.T) { // ctx := context.Background() - logger, err := zap.NewDevelopment() - require.NoError(t, err) - nwakuConfig := WakuConfig{ Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", Relay: true, @@ -55,7 +52,7 @@ func TestBasicWaku(t *testing.T) { storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku")) + w, err := NewWakuNode(&nwakuConfig, "nwaku") require.NoError(t, err) require.NoError(t, w.Start()) @@ -185,15 +182,13 @@ func TestBasicWaku(t *testing.T) { */ require.NoError(t, w.Stop()) + } func TestPeerExchange(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) - // start node that will be discovered by PeerExchange discV5NodeWakuConfig := WakuConfig{ Relay: true, @@ -206,7 +201,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: tcpPort, } - discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node")) + discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, "discV5Node") require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -216,7 +211,7 @@ func TestPeerExchange(t *testing.T) { discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node which serves as PeerExchange server @@ -232,7 +227,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: tcpPort, } - pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode")) + pxServerNode, err := NewWakuNode(&pxServerWakuConfig, "pxServerNode") require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -265,7 +260,7 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start light node which uses PeerExchange to discover peers @@ -281,7 +276,7 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode")) + lightNode, err := NewWakuNode(&pxClientWakuConfig, "lightNode") require.NoError(t, err) require.NoError(t, lightNode.Start()) @@ -324,10 +319,8 @@ func TestPeerExchange(t *testing.T) { } func TestDnsDiscover(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) nameserver := "8.8.8.8" @@ -340,7 +333,7 @@ func TestDnsDiscover(t *testing.T) { TcpPort: tcpPort, } - node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node")) + node, err := NewWakuNode(&nodeWakuConfig, "node") require.NoError(t, err) require.NoError(t, node.Start()) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" @@ -355,10 +348,8 @@ func TestDnsDiscover(t *testing.T) { } func TestDial(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will initiate the dial @@ -372,11 +363,11 @@ func TestDial(t *testing.T) { TcpPort: tcpPort, } - dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode") require.NoError(t, err) require.NoError(t, dialerNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the dial @@ -390,7 +381,7 @@ func TestDial(t *testing.T) { TcpPort: tcpPort, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -423,10 +414,7 @@ func TestDial(t *testing.T) { } func TestRelay(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will send the message @@ -440,11 +428,11 @@ func TestRelay(t *testing.T) { TcpPort: tcpPort, } - senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode") require.NoError(t, err) require.NoError(t, senderNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the message @@ -457,7 +445,7 @@ func TestRelay(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -507,12 +495,10 @@ func TestRelay(t *testing.T) { } func TestTopicHealth(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) clusterId := uint16(16) shardId := uint16(64) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node1 @@ -526,11 +512,11 @@ func TestTopicHealth(t *testing.T) { TcpPort: tcpPort, } - node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) + node1, err := NewWakuNode(&wakuConfig1, "node1") require.NoError(t, err) require.NoError(t, node1.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node2 @@ -543,7 +529,7 @@ func TestTopicHealth(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) + node2, err := NewWakuNode(&wakuConfig2, "node2") require.NoError(t, err) require.NoError(t, node2.Start()) multiaddr2, err := node2.ListenAddresses() @@ -582,12 +568,10 @@ func TestTopicHealth(t *testing.T) { } func TestConnectionChange(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) clusterId := uint16(16) shardId := uint16(64) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node1 @@ -601,11 +585,11 @@ func TestConnectionChange(t *testing.T) { TcpPort: tcpPort, } - node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) + node1, err := NewWakuNode(&wakuConfig1, "node1") require.NoError(t, err) require.NoError(t, node1.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node2 @@ -618,7 +602,7 @@ func TestConnectionChange(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) + node2, err := NewWakuNode(&wakuConfig2, "node2") require.NoError(t, err) require.NoError(t, node2.Start()) multiaddr2, err := node2.ListenAddresses() @@ -673,10 +657,8 @@ func TestConnectionChange(t *testing.T) { } func TestStore(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will send the message @@ -692,11 +674,11 @@ func TestStore(t *testing.T) { LegacyStore: false, } - senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode") require.NoError(t, err) require.NoError(t, senderNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the message @@ -711,7 +693,7 @@ func TestStore(t *testing.T) { TcpPort: tcpPort, LegacyStore: false, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -859,7 +841,7 @@ func TestParallelPings(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will initiate the dial @@ -873,11 +855,11 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode") require.NoError(t, err) require.NoError(t, dialerNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig1 := WakuConfig{ @@ -890,7 +872,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, logger.Named("receiverNode1")) + receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, "receiverNode1") require.NoError(t, err) require.NoError(t, receiverNode1.Start()) receiverMultiaddr1, err := receiverNode1.ListenAddresses() @@ -898,7 +880,7 @@ func TestParallelPings(t *testing.T) { require.NotNil(t, receiverMultiaddr1) require.True(t, len(receiverMultiaddr1) > 0) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig2 := WakuConfig{ @@ -911,7 +893,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, logger.Named("receiverNode2")) + receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, "receiverNode2") require.NoError(t, err) require.NoError(t, receiverNode2.Start()) receiverMultiaddr2, err := receiverNode2.ListenAddresses() @@ -919,7 +901,7 @@ func TestParallelPings(t *testing.T) { require.NotNil(t, receiverMultiaddr2) require.True(t, len(receiverMultiaddr2) > 0) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig3 := WakuConfig{ @@ -932,7 +914,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, logger.Named("receiverNode3")) + receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, "receiverNode3") require.NoError(t, err) require.NoError(t, receiverNode3.Start()) receiverMultiaddr3, err := receiverNode3.ListenAddresses() diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 48a22bc..bb923fe 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -7,13 +7,9 @@ import ( "net/http" "os" "strconv" - "time" "github.com/cenkalti/backoff/v3" - "github.com/sirupsen/logrus" - "go.uber.org/zap" - "golang.org/x/exp/rand" ) type NwakuInfo struct { @@ -21,31 +17,6 @@ type NwakuInfo struct { EnrUri string `json:"enrUri"` } -func logrusToZap(log *logrus.Logger, nodeName string) *zap.Logger { - config := zap.NewDevelopmentConfig() - config.EncoderConfig.TimeKey = "" // Remove timestamp duplication - config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - - zapLogger, _ := config.Build() - return zapLogger.Named(nodeName) -} - -func GenerateUniquePort() int { - rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) // Local RNG instance - - for { - port := rng.Intn(MaxPort-MinPort+1) + MinPort - - portsMutex.Lock() - if !usedPorts[port] { - usedPorts[port] = true - portsMutex.Unlock() - return port - } - portsMutex.Unlock() - } -} - func GetNwakuInfo(host *string, port *int) (NwakuInfo, error) { nwakuRestPort := 8645 if port != nil { diff --git a/waku/test_data.go b/waku/test_data.go index 5aca49f..bdb139c 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -1,31 +1,39 @@ package waku import ( - "sync" "time" ) +var DefaultWakuConfig WakuConfig + +func init() { + + udpPort, _, err1 := GetFreePortIfNeeded(0, 0) + tcpPort, _, err2 := GetFreePortIfNeeded(0, 0) + + if err1 != nil || err2 != nil { + Error("Failed to get free ports %v %v", err1, err2) + } + + DefaultWakuConfig = WakuConfig{ + Relay: false, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: udpPort, + TcpPort: tcpPort, + } +} + const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node var DefaultPubsubTopic = "/waku/2/rs/16/64" var ( - MinPort = 1024 // Minimum allowable port (exported) - MaxPort = 65535 // Maximum allowable port (exported) - usedPorts = make(map[int]bool) // Tracks used ports (internal to package) - portsMutex sync.Mutex // Ensures thread-safe access to usedPorts + MinPort = 1024 // Minimum allowable port (exported) + MaxPort = 65535 // Maximum allowable port (exported) ) - -// Default configuration values -var DefaultWakuConfig = WakuConfig{ - Relay: false, - LogLevel: "DEBUG", - Discv5Discovery: true, - ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: false, - Store: false, - Filter: false, - Lightpush: false, - Discv5UdpPort: GenerateUniquePort(), - TcpPort: GenerateUniquePort(), -}