diff --git a/go.mod b/go.mod index 363f38e..166611d 100644 --- a/go.mod +++ b/go.mod @@ -35,10 +35,13 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect go.uber.org/zap v1.27.0 golang.org/x/crypto v0.26.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect + google.golang.org/protobuf v1.34.2 ) -require github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057 +require ( + github.com/sirupsen/logrus v1.2.0 + github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057 +) require ( github.com/beorn7/perks v1.0.1 // indirect @@ -56,6 +59,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -79,6 +83,7 @@ require ( golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.24.0 // indirect + golang.org/x/term v0.23.0 // indirect golang.org/x/time v0.5.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.3.0 // indirect diff --git a/go.sum b/go.sum index dcf97e7..1d53bd8 100644 --- a/go.sum +++ b/go.sum @@ -297,6 +297,7 @@ github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk= @@ -514,6 +515,7 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= @@ -732,6 +734,8 @@ golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= diff --git a/waku/logging.go b/waku/logging.go new file mode 100644 index 0000000..6d610aa --- /dev/null +++ b/waku/logging.go @@ -0,0 +1,43 @@ +package waku + +import ( + "sync" + + "github.com/sirupsen/logrus" +) + +var ( + once sync.Once + instance *logrus.Logger +) + +// _getLogger ensures we always return the same logger instance (private function) +func _getLogger() *logrus.Logger { + once.Do(func() { + instance = logrus.New() + instance.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + }) + instance.SetLevel(logrus.DebugLevel) // Set default log level + }) + return instance +} + +// Debug logs a debug message +func Debug(msg string, args ...interface{}) { + _getLogger().WithFields(logrus.Fields{}).Debugf(msg, args...) +} + +// Info logs an info message +func Info(msg string, args ...interface{}) { + _getLogger().WithFields(logrus.Fields{}).Infof(msg, args...) +} + +// Error logs an error message +func Error(msg string, args ...interface{}) { + _getLogger().WithFields(logrus.Fields{}).Errorf(msg, args...) +} + +func Warn(msg string, args ...interface{}) { + _getLogger().WithFields(logrus.Fields{}).Warnf(msg, args...) +} diff --git a/waku/nwaku.go b/waku/nwaku.go index d59cad5..e3dc669 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -433,17 +433,17 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { type WakuNode struct { wakuCtx unsafe.Pointer config *WakuConfig - logger *zap.Logger MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange + nodeName string } -func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { - +func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { + Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ - config: config, - logger: logger, + config: config, + nodeName: nodeName, } wg := sync.WaitGroup{} @@ -454,14 +454,14 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { } var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonConfig)) defer C.freeResp(resp) if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("error wakuNew for %s: %v", nodeName, errMsg) return nil, errors.New(errMsg) } @@ -473,10 +473,10 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize) - // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) registerNode(n) + Debug("Successfully created WakuNode: %s", nodeName) return n, nil } @@ -538,7 +538,8 @@ func (n *WakuNode) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { - n.logger.Error("could not unmarshal nwaku event string", zap.Error(err)) + Error("could not unmarshal nwaku event string: %v", err) + return } @@ -555,7 +556,7 @@ func (n *WakuNode) OnEvent(eventStr string) { func (n *WakuNode) parseMessageEvent(eventStr string) { envelope, err := common.NewEnvelope(eventStr) if err != nil { - n.logger.Error("could not parse message", zap.Error(err)) + Error("could not parse message %v", err) } n.MsgChan <- envelope } @@ -565,7 +566,7 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { topicHealth := topicHealth{} err := json.Unmarshal([]byte(eventStr), &topicHealth) if err != nil { - n.logger.Error("could not parse topic health change", zap.Error(err)) + Error("could not parse topic health change %v", err) } n.TopicHealthChan <- topicHealth } @@ -575,21 +576,20 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { connectionChange := connectionChange{} err := json.Unmarshal([]byte(eventStr), &connectionChange) if err != nil { - n.logger.Error("could not parse connection change", zap.Error(err)) + Error("could not parse connection change %v", err) } n.ConnectionChangeChan <- connectionChange } func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { - var pubsubTopic string - if len(optPubsubTopic) == 0 { - pubsubTopic = "" - } else { + + Debug("Fetching number of connected relay peers for %s", n.nodeName) + pubsubTopic := "" + if len(optPubsubTopic) > 0 { pubsubTopic = optPubsubTopic[0] } wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -604,13 +604,16 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error() - return 0, errors.New(errMsg) + Error("Failed to convert relay peer count for %s: %v", n.nodeName, err) + return 0, err } + Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers) return numPeers, nil } - errMsg := "error GetNumConnectedRelayPeers: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + + errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg) + return 0, errors.New(errMsg) } @@ -634,8 +637,15 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { } func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - wg := sync.WaitGroup{} + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to get connected peers %v", err) + return nil, err + } + Debug("Fetching connected peers for %v", n.nodeName) + + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -646,23 +656,29 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { + Debug("No connected peers found for " + n.nodeName) return nil, nil } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") + + peerIDs := strings.Split(peersStr, ",") var peers peer.IDSlice - for _, peerId := range itemsPeerIds { - id, err := peer.Decode(peerId) + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) if err != nil { - return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err) + Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) + return nil, err } peers = append(peers, id) } + + Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) return peers, nil } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) + errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) + + return nil, errors.New(errMsg) } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { @@ -965,54 +981,74 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D } func (n *WakuNode) Start() error { - wg := sync.WaitGroup{} + Debug("Starting %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) wg.Wait() + if C.getRet(resp) == C.RET_OK { + Debug("Successfully started %s", n.nodeName) return nil } errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start %s: %s", n.nodeName, errMsg) + return errors.New(errMsg) } func (n *WakuNode) Stop() error { - wg := sync.WaitGroup{} + Debug("Stopping %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) C.cGoWakuStop(n.wakuCtx, resp) wg.Wait() + if C.getRet(resp) == C.RET_OK { - unregisterNode(n) + Debug("Successfully stopped %s", n.nodeName) return nil } errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop %s: %s", n.nodeName, errMsg) + return errors.New(errMsg) } func (n *WakuNode) Destroy() error { - wg := sync.WaitGroup{} + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to destroy %v", err) + return err + } + Debug("Destroying %v", n.nodeName) + + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) wg.Wait() + if C.getRet(resp) == C.RET_OK { + Debug("Successfully destroyed " + n.nodeName) return nil } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) + return errors.New(errMsg) } @@ -1240,11 +1276,24 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p } func (n *WakuNode) GetNumConnectedPeers() (int, error) { - peers, err := n.GetConnectedPeers() - if err != nil { + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to get number of connected peers %v", err) return 0, err } - return len(peers), nil + + Debug("Fetching number of connected peers for %v", n.nodeName) + + peers, err := n.GetConnectedPeers() + if err != nil { + Error("Failed to fetch connected peers for %v %v ", n.nodeName, err) + return 0, err + } + + numPeers := len(peers) + Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) + + return numPeers, nil } func getContextTimeoutMilliseconds(ctx context.Context) int { @@ -1259,17 +1308,18 @@ func FormatWakuRelayTopic(clusterId uint16, shard uint16) string { return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard) } -func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (int, int, error) { +func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { if tcpPort == 0 { for i := 0; i < 10; i++ { tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0")) if err != nil { - logger.Warn("unable to resolve tcp addr: %v", zap.Error(err)) + Warn("unable to resolve tcp addr: %v", zap.Error(err)) continue } tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - logger.Warn("unable to listen on addr", zap.Stringer("addr", tcpAddr), zap.Error(err)) + Warn("unable to listen on addr: addr=%v, error=%v", tcpAddr, err) + continue } tcpPort = tcpListener.Addr().(*net.TCPAddr).Port @@ -1285,13 +1335,14 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in for i := 0; i < 10; i++ { udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("localhost", "0")) if err != nil { - logger.Warn("unable to resolve udp addr: %v", zap.Error(err)) + Warn("unable to resolve udp addr: %v", err) continue } udpListener, err := net.ListenUDP("udp", udpAddr) if err != nil { - logger.Warn("unable to listen on addr", zap.Stringer("addr", udpAddr), zap.Error(err)) + Warn("unable to listen on addr: addr=%v, error=%v", udpAddr, err) + continue } @@ -1306,3 +1357,109 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in return tcpPort, discV5UDPPort, nil } + +// Create & start node +func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { + + Debug("Initializing %s", nodeName) + + var nodeCfg WakuConfig + if customCfg == nil { + nodeCfg = DefaultWakuConfig + } else { + nodeCfg = *customCfg + } + + Debug("Creating %s", nodeName) + node, err := NewWakuNode(&nodeCfg, nodeName) + if err != nil { + Error("Failed to create %s: %v", nodeName, err) + return nil, err + } + + Debug("Starting %s", nodeName) + if err := node.Start(); err != nil { + Error("Failed to start %s: %v", nodeName, err) + return nil, err + } + + Debug("Successfully started %s", nodeName) + return node, nil +} + +func (n *WakuNode) StopAndDestroy() error { + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to stop and destroy: %v", err) + return err + } + + Debug("Stopping %s", n.nodeName) + + err := n.Stop() + if err != nil { + Error("Failed to stop %s: %v", n.nodeName, err) + return err + } + + Debug("Destroying %s", n.nodeName) + + err = n.Destroy() + if err != nil { + Error("Failed to destroy %s: %v", n.nodeName, err) + return err + } + + Debug("Successfully stopped and destroyed %s", n.nodeName) + return nil +} + +func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error { + + Debug("Connecting %s to %s", n.nodeName, targetNode.nodeName) + + targetPeerID, err := targetNode.PeerID() + if err != nil { + Error("Failed to get PeerID of target node %s: %v", targetNode.nodeName, err) + return err + } + + targetAddr, err := targetNode.ListenAddresses() + if err != nil || len(targetAddr) == 0 { + Error("Failed to get listen addresses for target node %s: %v", targetNode.nodeName, err) + return errors.New("target node has no listen addresses") + } + + Debug("Attempting connection to peer %s", targetPeerID.String()) + + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + + err = n.Connect(ctx, targetAddr[0]) + if err != nil { + Error("Failed to connect to peer %s: %v", targetPeerID.String(), err) + return err + } + + Debug("Successfully connected %s to %s", n.nodeName, targetNode.nodeName) + return nil +} + +func (n *WakuNode) DisconnectPeer(target *WakuNode) error { + Debug("Disconnecting %s from %s", n.nodeName, target.nodeName) + + targetPeerID, err := target.PeerID() + if err != nil { + Error("Failed to get PeerID of target node %s: %v", target.nodeName, err) + return err + } + + err = n.DisconnectPeerByID(targetPeerID) + if err != nil { + Error("Failed to disconnect peer %s: %v", targetPeerID.String(), err) + return err + } + + Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) + return nil +} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index ee38b32..96450a7 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -37,9 +37,6 @@ func TestBasicWaku(t *testing.T) { // ctx := context.Background() - logger, err := zap.NewDevelopment() - require.NoError(t, err) - nwakuConfig := WakuConfig{ Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", Relay: true, @@ -55,7 +52,7 @@ func TestBasicWaku(t *testing.T) { storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku")) + w, err := NewWakuNode(&nwakuConfig, "nwaku") require.NoError(t, err) require.NoError(t, w.Start()) @@ -185,15 +182,13 @@ func TestBasicWaku(t *testing.T) { */ require.NoError(t, w.Stop()) + } func TestPeerExchange(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) - // start node that will be discovered by PeerExchange discV5NodeWakuConfig := WakuConfig{ Relay: true, @@ -206,7 +201,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: tcpPort, } - discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node")) + discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, "discV5Node") require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -216,7 +211,7 @@ func TestPeerExchange(t *testing.T) { discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node which serves as PeerExchange server @@ -232,7 +227,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: tcpPort, } - pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode")) + pxServerNode, err := NewWakuNode(&pxServerWakuConfig, "pxServerNode") require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -265,7 +260,7 @@ func TestPeerExchange(t *testing.T) { }, options) require.NoError(t, err) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start light node which uses PeerExchange to discover peers @@ -281,7 +276,7 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode")) + lightNode, err := NewWakuNode(&pxClientWakuConfig, "lightNode") require.NoError(t, err) require.NoError(t, lightNode.Start()) @@ -324,10 +319,8 @@ func TestPeerExchange(t *testing.T) { } func TestDnsDiscover(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) nameserver := "8.8.8.8" @@ -340,7 +333,7 @@ func TestDnsDiscover(t *testing.T) { TcpPort: tcpPort, } - node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node")) + node, err := NewWakuNode(&nodeWakuConfig, "node") require.NoError(t, err) require.NoError(t, node.Start()) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" @@ -355,10 +348,8 @@ func TestDnsDiscover(t *testing.T) { } func TestDial(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will initiate the dial @@ -372,11 +363,11 @@ func TestDial(t *testing.T) { TcpPort: tcpPort, } - dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode") require.NoError(t, err) require.NoError(t, dialerNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the dial @@ -390,7 +381,7 @@ func TestDial(t *testing.T) { TcpPort: tcpPort, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -423,10 +414,7 @@ func TestDial(t *testing.T) { } func TestRelay(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will send the message @@ -440,11 +428,11 @@ func TestRelay(t *testing.T) { TcpPort: tcpPort, } - senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode") require.NoError(t, err) require.NoError(t, senderNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the message @@ -457,7 +445,7 @@ func TestRelay(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -507,12 +495,10 @@ func TestRelay(t *testing.T) { } func TestTopicHealth(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) clusterId := uint16(16) shardId := uint16(64) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node1 @@ -526,11 +512,11 @@ func TestTopicHealth(t *testing.T) { TcpPort: tcpPort, } - node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) + node1, err := NewWakuNode(&wakuConfig1, "node1") require.NoError(t, err) require.NoError(t, node1.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node2 @@ -543,7 +529,7 @@ func TestTopicHealth(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) + node2, err := NewWakuNode(&wakuConfig2, "node2") require.NoError(t, err) require.NoError(t, node2.Start()) multiaddr2, err := node2.ListenAddresses() @@ -582,12 +568,10 @@ func TestTopicHealth(t *testing.T) { } func TestConnectionChange(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) clusterId := uint16(16) shardId := uint16(64) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node1 @@ -601,11 +585,11 @@ func TestConnectionChange(t *testing.T) { TcpPort: tcpPort, } - node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) + node1, err := NewWakuNode(&wakuConfig1, "node1") require.NoError(t, err) require.NoError(t, node1.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node2 @@ -618,7 +602,7 @@ func TestConnectionChange(t *testing.T) { Discv5UdpPort: udpPort, TcpPort: tcpPort, } - node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) + node2, err := NewWakuNode(&wakuConfig2, "node2") require.NoError(t, err) require.NoError(t, node2.Start()) multiaddr2, err := node2.ListenAddresses() @@ -673,10 +657,8 @@ func TestConnectionChange(t *testing.T) { } func TestStore(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will send the message @@ -692,11 +674,11 @@ func TestStore(t *testing.T) { LegacyStore: false, } - senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode") require.NoError(t, err) require.NoError(t, senderNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will receive the message @@ -711,7 +693,7 @@ func TestStore(t *testing.T) { TcpPort: tcpPort, LegacyStore: false, } - receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode") require.NoError(t, err) require.NoError(t, receiverNode.Start()) receiverMultiaddr, err := receiverNode.ListenAddresses() @@ -859,7 +841,7 @@ func TestParallelPings(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) - tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will initiate the dial @@ -873,11 +855,11 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode") require.NoError(t, err) require.NoError(t, dialerNode.Start()) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig1 := WakuConfig{ @@ -890,7 +872,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, logger.Named("receiverNode1")) + receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, "receiverNode1") require.NoError(t, err) require.NoError(t, receiverNode1.Start()) receiverMultiaddr1, err := receiverNode1.ListenAddresses() @@ -898,7 +880,7 @@ func TestParallelPings(t *testing.T) { require.NotNil(t, receiverMultiaddr1) require.True(t, len(receiverMultiaddr1) > 0) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig2 := WakuConfig{ @@ -911,7 +893,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, logger.Named("receiverNode2")) + receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, "receiverNode2") require.NoError(t, err) require.NoError(t, receiverNode2.Start()) receiverMultiaddr2, err := receiverNode2.ListenAddresses() @@ -919,7 +901,7 @@ func TestParallelPings(t *testing.T) { require.NotNil(t, receiverMultiaddr2) require.True(t, len(receiverMultiaddr2) > 0) - tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger) + tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) receiverNodeWakuConfig3 := WakuConfig{ @@ -932,7 +914,7 @@ func TestParallelPings(t *testing.T) { TcpPort: tcpPort, } - receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, logger.Named("receiverNode3")) + receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, "receiverNode3") require.NoError(t, err) require.NoError(t, receiverNode3.Start()) receiverMultiaddr3, err := receiverNode3.ListenAddresses() diff --git a/waku/test_data.go b/waku/test_data.go new file mode 100644 index 0000000..bdb139c --- /dev/null +++ b/waku/test_data.go @@ -0,0 +1,39 @@ +package waku + +import ( + "time" +) + +var DefaultWakuConfig WakuConfig + +func init() { + + udpPort, _, err1 := GetFreePortIfNeeded(0, 0) + tcpPort, _, err2 := GetFreePortIfNeeded(0, 0) + + if err1 != nil || err2 != nil { + Error("Failed to get free ports %v %v", err1, err2) + } + + DefaultWakuConfig = WakuConfig{ + Relay: false, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: udpPort, + TcpPort: tcpPort, + } +} + +const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node + +var DefaultPubsubTopic = "/waku/2/rs/16/64" +var ( + MinPort = 1024 // Minimum allowable port (exported) + MaxPort = 65535 // Maximum allowable port (exported) +)