Merge pull request #29 from waku-org/Test_utils

Adding changes to waku files to remove wrappers
This commit is contained in:
AYAHASSAN287 2025-02-12 12:34:20 +03:00 committed by GitHub
commit 1187190ee0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 329 additions and 99 deletions

9
go.mod
View File

@ -35,10 +35,13 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.26.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
google.golang.org/protobuf v1.34.2
)
require github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057
require (
github.com/sirupsen/logrus v1.2.0
github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057
)
require (
github.com/beorn7/perks v1.0.1 // indirect
@ -56,6 +59,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
@ -79,6 +83,7 @@ require (
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect

4
go.sum
View File

@ -297,6 +297,7 @@ github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0=
github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk=
@ -514,6 +515,7 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
@ -732,6 +734,8 @@ golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=

43
waku/logging.go Normal file
View File

@ -0,0 +1,43 @@
package waku
import (
"sync"
"github.com/sirupsen/logrus"
)
var (
once sync.Once
instance *logrus.Logger
)
// _getLogger ensures we always return the same logger instance (private function)
func _getLogger() *logrus.Logger {
once.Do(func() {
instance = logrus.New()
instance.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
instance.SetLevel(logrus.DebugLevel) // Set default log level
})
return instance
}
// Debug logs a debug message
func Debug(msg string, args ...interface{}) {
_getLogger().WithFields(logrus.Fields{}).Debugf(msg, args...)
}
// Info logs an info message
func Info(msg string, args ...interface{}) {
_getLogger().WithFields(logrus.Fields{}).Infof(msg, args...)
}
// Error logs an error message
func Error(msg string, args ...interface{}) {
_getLogger().WithFields(logrus.Fields{}).Errorf(msg, args...)
}
func Warn(msg string, args ...interface{}) {
_getLogger().WithFields(logrus.Fields{}).Warnf(msg, args...)
}

View File

@ -433,17 +433,17 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
type WakuNode struct {
wakuCtx unsafe.Pointer
config *WakuConfig
logger *zap.Logger
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
ConnectionChangeChan chan connectionChange
nodeName string
}
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) {
Debug("Creating new WakuNode: %v", nodeName)
n := &WakuNode{
config: config,
logger: logger,
config: config,
nodeName: nodeName,
}
wg := sync.WaitGroup{}
@ -454,14 +454,14 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
}
var cJsonConfig = C.CString(string(jsonConfig))
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.free(unsafe.Pointer(cJsonConfig))
defer C.freeResp(resp)
if C.getRet(resp) != C.RET_OK {
errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("error wakuNew for %s: %v", nodeName, errMsg)
return nil, errors.New(errMsg)
}
@ -473,10 +473,10 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize)
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
registerNode(n)
Debug("Successfully created WakuNode: %s", nodeName)
return n, nil
}
@ -538,7 +538,8 @@ func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
Error("could not unmarshal nwaku event string: %v", err)
return
}
@ -555,7 +556,7 @@ func (n *WakuNode) OnEvent(eventStr string) {
func (n *WakuNode) parseMessageEvent(eventStr string) {
envelope, err := common.NewEnvelope(eventStr)
if err != nil {
n.logger.Error("could not parse message", zap.Error(err))
Error("could not parse message %v", err)
}
n.MsgChan <- envelope
}
@ -565,7 +566,7 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) {
topicHealth := topicHealth{}
err := json.Unmarshal([]byte(eventStr), &topicHealth)
if err != nil {
n.logger.Error("could not parse topic health change", zap.Error(err))
Error("could not parse topic health change %v", err)
}
n.TopicHealthChan <- topicHealth
}
@ -575,21 +576,20 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) {
connectionChange := connectionChange{}
err := json.Unmarshal([]byte(eventStr), &connectionChange)
if err != nil {
n.logger.Error("could not parse connection change", zap.Error(err))
Error("could not parse connection change %v", err)
}
n.ConnectionChangeChan <- connectionChange
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {
pubsubTopic = ""
} else {
Debug("Fetching number of connected relay peers for %s", n.nodeName)
pubsubTopic := ""
if len(optPubsubTopic) > 0 {
pubsubTopic = optPubsubTopic[0]
}
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -604,13 +604,16 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
Error("Failed to convert relay peer count for %s: %v", n.nodeName, err)
return 0, err
}
Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers)
return numPeers, nil
}
errMsg := "error GetNumConnectedRelayPeers: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg)
return 0, errors.New(errMsg)
}
@ -634,8 +637,15 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
}
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
wg := sync.WaitGroup{}
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to get connected peers %v", err)
return nil, err
}
Debug("Fetching connected peers for %v", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -646,23 +656,29 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
Debug("No connected peers found for " + n.nodeName)
return nil, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
peerIDs := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
for _, peerID := range peerIDs {
id, err := peer.Decode(peerID)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err)
Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err))
return nil, err
}
peers = append(peers, id)
}
Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers)))
return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg)
errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
@ -965,54 +981,74 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D
}
func (n *WakuNode) Start() error {
wg := sync.WaitGroup{}
Debug("Starting %s", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully started %s", n.nodeName)
return nil
}
errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to start %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
}
func (n *WakuNode) Stop() error {
wg := sync.WaitGroup{}
Debug("Stopping %s", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStop(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
unregisterNode(n)
Debug("Successfully stopped %s", n.nodeName)
return nil
}
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to stop %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
}
func (n *WakuNode) Destroy() error {
wg := sync.WaitGroup{}
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to destroy %v", err)
return err
}
Debug("Destroying %v", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuDestroy(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully destroyed " + n.nodeName)
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg))
return errors.New(errMsg)
}
@ -1240,11 +1276,24 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p
}
func (n *WakuNode) GetNumConnectedPeers() (int, error) {
peers, err := n.GetConnectedPeers()
if err != nil {
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to get number of connected peers %v", err)
return 0, err
}
return len(peers), nil
Debug("Fetching number of connected peers for %v", n.nodeName)
peers, err := n.GetConnectedPeers()
if err != nil {
Error("Failed to fetch connected peers for %v %v ", n.nodeName, err)
return 0, err
}
numPeers := len(peers)
Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers))
return numPeers, nil
}
func getContextTimeoutMilliseconds(ctx context.Context) int {
@ -1259,17 +1308,18 @@ func FormatWakuRelayTopic(clusterId uint16, shard uint16) string {
return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard)
}
func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (int, int, error) {
func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) {
if tcpPort == 0 {
for i := 0; i < 10; i++ {
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0"))
if err != nil {
logger.Warn("unable to resolve tcp addr: %v", zap.Error(err))
Warn("unable to resolve tcp addr: %v", zap.Error(err))
continue
}
tcpListener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
logger.Warn("unable to listen on addr", zap.Stringer("addr", tcpAddr), zap.Error(err))
Warn("unable to listen on addr: addr=%v, error=%v", tcpAddr, err)
continue
}
tcpPort = tcpListener.Addr().(*net.TCPAddr).Port
@ -1285,13 +1335,14 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in
for i := 0; i < 10; i++ {
udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("localhost", "0"))
if err != nil {
logger.Warn("unable to resolve udp addr: %v", zap.Error(err))
Warn("unable to resolve udp addr: %v", err)
continue
}
udpListener, err := net.ListenUDP("udp", udpAddr)
if err != nil {
logger.Warn("unable to listen on addr", zap.Stringer("addr", udpAddr), zap.Error(err))
Warn("unable to listen on addr: addr=%v, error=%v", udpAddr, err)
continue
}
@ -1306,3 +1357,109 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in
return tcpPort, discV5UDPPort, nil
}
// Create & start node
func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
Debug("Initializing %s", nodeName)
var nodeCfg WakuConfig
if customCfg == nil {
nodeCfg = DefaultWakuConfig
} else {
nodeCfg = *customCfg
}
Debug("Creating %s", nodeName)
node, err := NewWakuNode(&nodeCfg, nodeName)
if err != nil {
Error("Failed to create %s: %v", nodeName, err)
return nil, err
}
Debug("Starting %s", nodeName)
if err := node.Start(); err != nil {
Error("Failed to start %s: %v", nodeName, err)
return nil, err
}
Debug("Successfully started %s", nodeName)
return node, nil
}
func (n *WakuNode) StopAndDestroy() error {
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to stop and destroy: %v", err)
return err
}
Debug("Stopping %s", n.nodeName)
err := n.Stop()
if err != nil {
Error("Failed to stop %s: %v", n.nodeName, err)
return err
}
Debug("Destroying %s", n.nodeName)
err = n.Destroy()
if err != nil {
Error("Failed to destroy %s: %v", n.nodeName, err)
return err
}
Debug("Successfully stopped and destroyed %s", n.nodeName)
return nil
}
func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error {
Debug("Connecting %s to %s", n.nodeName, targetNode.nodeName)
targetPeerID, err := targetNode.PeerID()
if err != nil {
Error("Failed to get PeerID of target node %s: %v", targetNode.nodeName, err)
return err
}
targetAddr, err := targetNode.ListenAddresses()
if err != nil || len(targetAddr) == 0 {
Error("Failed to get listen addresses for target node %s: %v", targetNode.nodeName, err)
return errors.New("target node has no listen addresses")
}
Debug("Attempting connection to peer %s", targetPeerID.String())
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = n.Connect(ctx, targetAddr[0])
if err != nil {
Error("Failed to connect to peer %s: %v", targetPeerID.String(), err)
return err
}
Debug("Successfully connected %s to %s", n.nodeName, targetNode.nodeName)
return nil
}
func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
Debug("Disconnecting %s from %s", n.nodeName, target.nodeName)
targetPeerID, err := target.PeerID()
if err != nil {
Error("Failed to get PeerID of target node %s: %v", target.nodeName, err)
return err
}
err = n.DisconnectPeerByID(targetPeerID)
if err != nil {
Error("Failed to disconnect peer %s: %v", targetPeerID.String(), err)
return err
}
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
return nil
}

View File

@ -37,9 +37,6 @@ func TestBasicWaku(t *testing.T) {
// ctx := context.Background()
logger, err := zap.NewDevelopment()
require.NoError(t, err)
nwakuConfig := WakuConfig{
Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
Relay: true,
@ -55,7 +52,7 @@ func TestBasicWaku(t *testing.T) {
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)
w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku"))
w, err := NewWakuNode(&nwakuConfig, "nwaku")
require.NoError(t, err)
require.NoError(t, w.Start())
@ -185,15 +182,13 @@ func TestBasicWaku(t *testing.T) {
*/
require.NoError(t, w.Stop())
}
func TestPeerExchange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will be discovered by PeerExchange
discV5NodeWakuConfig := WakuConfig{
Relay: true,
@ -206,7 +201,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: tcpPort,
}
discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node"))
discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, "discV5Node")
require.NoError(t, err)
require.NoError(t, discV5Node.Start())
@ -216,7 +211,7 @@ func TestPeerExchange(t *testing.T) {
discv5NodeEnr, err := discV5Node.ENR()
require.NoError(t, err)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node which serves as PeerExchange server
@ -232,7 +227,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: tcpPort,
}
pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode"))
pxServerNode, err := NewWakuNode(&pxServerWakuConfig, "pxServerNode")
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
@ -265,7 +260,7 @@ func TestPeerExchange(t *testing.T) {
}, options)
require.NoError(t, err)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start light node which uses PeerExchange to discover peers
@ -281,7 +276,7 @@ func TestPeerExchange(t *testing.T) {
PeerExchangeNode: serverNodeMa[0].String(),
}
lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode"))
lightNode, err := NewWakuNode(&pxClientWakuConfig, "lightNode")
require.NoError(t, err)
require.NoError(t, lightNode.Start())
@ -324,10 +319,8 @@ func TestPeerExchange(t *testing.T) {
}
func TestDnsDiscover(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
nameserver := "8.8.8.8"
@ -340,7 +333,7 @@ func TestDnsDiscover(t *testing.T) {
TcpPort: tcpPort,
}
node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node"))
node, err := NewWakuNode(&nodeWakuConfig, "node")
require.NoError(t, err)
require.NoError(t, node.Start())
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
@ -355,10 +348,8 @@ func TestDnsDiscover(t *testing.T) {
}
func TestDial(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will initiate the dial
@ -372,11 +363,11 @@ func TestDial(t *testing.T) {
TcpPort: tcpPort,
}
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode")
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the dial
@ -390,7 +381,7 @@ func TestDial(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -423,10 +414,7 @@ func TestDial(t *testing.T) {
}
func TestRelay(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will send the message
@ -440,11 +428,11 @@ func TestRelay(t *testing.T) {
TcpPort: tcpPort,
}
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode")
require.NoError(t, err)
require.NoError(t, senderNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the message
@ -457,7 +445,7 @@ func TestRelay(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -507,12 +495,10 @@ func TestRelay(t *testing.T) {
}
func TestTopicHealth(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node1
@ -526,11 +512,11 @@ func TestTopicHealth(t *testing.T) {
TcpPort: tcpPort,
}
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
node1, err := NewWakuNode(&wakuConfig1, "node1")
require.NoError(t, err)
require.NoError(t, node1.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node2
@ -543,7 +529,7 @@ func TestTopicHealth(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
node2, err := NewWakuNode(&wakuConfig2, "node2")
require.NoError(t, err)
require.NoError(t, node2.Start())
multiaddr2, err := node2.ListenAddresses()
@ -582,12 +568,10 @@ func TestTopicHealth(t *testing.T) {
}
func TestConnectionChange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node1
@ -601,11 +585,11 @@ func TestConnectionChange(t *testing.T) {
TcpPort: tcpPort,
}
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
node1, err := NewWakuNode(&wakuConfig1, "node1")
require.NoError(t, err)
require.NoError(t, node1.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node2
@ -618,7 +602,7 @@ func TestConnectionChange(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
node2, err := NewWakuNode(&wakuConfig2, "node2")
require.NoError(t, err)
require.NoError(t, node2.Start())
multiaddr2, err := node2.ListenAddresses()
@ -673,10 +657,8 @@ func TestConnectionChange(t *testing.T) {
}
func TestStore(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will send the message
@ -692,11 +674,11 @@ func TestStore(t *testing.T) {
LegacyStore: false,
}
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode")
require.NoError(t, err)
require.NoError(t, senderNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the message
@ -711,7 +693,7 @@ func TestStore(t *testing.T) {
TcpPort: tcpPort,
LegacyStore: false,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -859,7 +841,7 @@ func TestParallelPings(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will initiate the dial
@ -873,11 +855,11 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode")
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig1 := WakuConfig{
@ -890,7 +872,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, logger.Named("receiverNode1"))
receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, "receiverNode1")
require.NoError(t, err)
require.NoError(t, receiverNode1.Start())
receiverMultiaddr1, err := receiverNode1.ListenAddresses()
@ -898,7 +880,7 @@ func TestParallelPings(t *testing.T) {
require.NotNil(t, receiverMultiaddr1)
require.True(t, len(receiverMultiaddr1) > 0)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig2 := WakuConfig{
@ -911,7 +893,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, logger.Named("receiverNode2"))
receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, "receiverNode2")
require.NoError(t, err)
require.NoError(t, receiverNode2.Start())
receiverMultiaddr2, err := receiverNode2.ListenAddresses()
@ -919,7 +901,7 @@ func TestParallelPings(t *testing.T) {
require.NotNil(t, receiverMultiaddr2)
require.True(t, len(receiverMultiaddr2) > 0)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig3 := WakuConfig{
@ -932,7 +914,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, logger.Named("receiverNode3"))
receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, "receiverNode3")
require.NoError(t, err)
require.NoError(t, receiverNode3.Start())
receiverMultiaddr3, err := receiverNode3.ListenAddresses()

39
waku/test_data.go Normal file
View File

@ -0,0 +1,39 @@
package waku
import (
"time"
)
var DefaultWakuConfig WakuConfig
func init() {
udpPort, _, err1 := GetFreePortIfNeeded(0, 0)
tcpPort, _, err2 := GetFreePortIfNeeded(0, 0)
if err1 != nil || err2 != nil {
Error("Failed to get free ports %v %v", err1, err2)
}
DefaultWakuConfig = WakuConfig{
Relay: false,
LogLevel: "DEBUG",
Discv5Discovery: true,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: false,
Store: false,
Filter: false,
Lightpush: false,
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
}
const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
var DefaultPubsubTopic = "/waku/2/rs/16/64"
var (
MinPort = 1024 // Minimum allowable port (exported)
MaxPort = 65535 // Maximum allowable port (exported)
)