Adding review points

This commit is contained in:
aya 2025-02-11 16:21:14 +02:00
parent 0940993ce6
commit 9946b3d499
5 changed files with 147 additions and 206 deletions

View File

@ -11,8 +11,8 @@ var (
instance *logrus.Logger
)
// GetLogger ensures we always return the same logger instance
func GetLogger() *logrus.Logger {
// _getLogger ensures we always return the same logger instance (private function)
func _getLogger() *logrus.Logger {
once.Do(func() {
instance = logrus.New()
instance.SetFormatter(&logrus.TextFormatter{
@ -25,15 +25,19 @@ func GetLogger() *logrus.Logger {
// Debug logs a debug message
func Debug(msg string, args ...interface{}) {
GetLogger().WithFields(logrus.Fields{}).Debugf(msg, args...)
_getLogger().WithFields(logrus.Fields{}).Debugf(msg, args...)
}
// Info logs an info message
func Info(msg string, args ...interface{}) {
GetLogger().WithFields(logrus.Fields{}).Infof(msg, args...)
_getLogger().WithFields(logrus.Fields{}).Infof(msg, args...)
}
// Error logs an error message
func Error(msg string, args ...interface{}) {
GetLogger().WithFields(logrus.Fields{}).Errorf(msg, args...)
_getLogger().WithFields(logrus.Fields{}).Errorf(msg, args...)
}
func Warn(msg string, args ...interface{}) {
_getLogger().WithFields(logrus.Fields{}).Warnf(msg, args...)
}

View File

@ -433,18 +433,17 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
type WakuNode struct {
wakuCtx unsafe.Pointer
config *WakuConfig
logger *zap.Logger
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
ConnectionChangeChan chan connectionChange
nodeName string
}
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) {
Debug("Creating new WakuNode: %v", nodeName)
n := &WakuNode{
config: config,
logger: logger,
config: config,
nodeName: nodeName,
}
wg := sync.WaitGroup{}
@ -455,14 +454,14 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
}
var cJsonConfig = C.CString(string(jsonConfig))
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.free(unsafe.Pointer(cJsonConfig))
defer C.freeResp(resp)
if C.getRet(resp) != C.RET_OK {
errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("error wakuNew for %s: %v", nodeName, errMsg)
return nil, errors.New(errMsg)
}
@ -474,10 +473,10 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize)
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
registerNode(n)
Debug("Successfully created WakuNode: %s", nodeName)
return n, nil
}
@ -539,7 +538,8 @@ func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
Error("could not unmarshal nwaku event string: %v", err)
return
}
@ -556,7 +556,7 @@ func (n *WakuNode) OnEvent(eventStr string) {
func (n *WakuNode) parseMessageEvent(eventStr string) {
envelope, err := common.NewEnvelope(eventStr)
if err != nil {
n.logger.Error("could not parse message", zap.Error(err))
Error("could not parse message %v", err)
}
n.MsgChan <- envelope
}
@ -566,7 +566,7 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) {
topicHealth := topicHealth{}
err := json.Unmarshal([]byte(eventStr), &topicHealth)
if err != nil {
n.logger.Error("could not parse topic health change", zap.Error(err))
Error("could not parse topic health change %v", err)
}
n.TopicHealthChan <- topicHealth
}
@ -576,15 +576,14 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) {
connectionChange := connectionChange{}
err := json.Unmarshal([]byte(eventStr), &connectionChange)
if err != nil {
n.logger.Error("could not parse connection change", zap.Error(err))
Error("could not parse connection change %v", err)
}
n.ConnectionChangeChan <- connectionChange
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching number of connected relay peers for " + n.nodeName)
Debug("Fetching number of connected relay peers for %s", n.nodeName)
pubsubTopic := ""
if len(optPubsubTopic) > 0 {
pubsubTopic = optPubsubTopic[0]
@ -605,16 +604,15 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
logger.Error("Failed to convert relay peer count for "+n.nodeName, zap.Error(err))
Error("Failed to convert relay peer count for %s: %v", n.nodeName, err)
return 0, err
}
logger.Debug("Successfully fetched number of connected relay peers for "+n.nodeName, zap.Int("count", numPeers))
Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers)
return numPeers, nil
}
errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to get number of connected relay peers for "+n.nodeName, zap.String("error", errMsg))
Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg)
return 0, errors.New(errMsg)
}
@ -640,14 +638,12 @@ func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to get connected peers", zap.Error(err))
Error("Failed to get connected peers %v", err)
return nil, err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching connected peers for " + n.nodeName)
Debug("Fetching connected peers for %v", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
@ -660,7 +656,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
logger.Debug("No connected peers found for " + n.nodeName)
Debug("No connected peers found for " + n.nodeName)
return nil, nil
}
@ -669,18 +665,18 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
for _, peerID := range peerIDs {
id, err := peer.Decode(peerID)
if err != nil {
logger.Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err))
Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err))
return nil, err
}
peers = append(peers, id)
}
logger.Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers)))
Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers)))
return peers, nil
}
errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg))
Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
@ -985,66 +981,57 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D
}
func (n *WakuNode) Start() error {
wg := sync.WaitGroup{}
Debug("Starting %s", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully started %s", n.nodeName)
return nil
}
errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to start %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
}
func (n *WakuNode) Stop() error {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to stop", zap.Error(err))
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Stopping " + n.nodeName)
Debug("Stopping %s", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStop(n.wakuCtx, resp) // Calls the C function to stop the Waku node
C.cGoWakuStop(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
unregisterNode(n) // Ensure the node is properly unregistered
logger.Debug("Successfully stopped " + n.nodeName)
Debug("Successfully stopped %s", n.nodeName)
return nil
}
// Extract error message from C response
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to stop "+n.nodeName, zap.String("error", errMsg))
Error("Failed to stop %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
}
func (n *WakuNode) Destroy() error {
if n == nil {
logger := GetLogger()
err := errors.New("Waku node is nil")
logger.Error("Failed to destroy", zap.Error(err))
err := errors.New("waku node is nil")
Error("Failed to destroy %v", err)
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Destroying " + n.nodeName)
Debug("Destroying %v", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
@ -1055,12 +1042,12 @@ func (n *WakuNode) Destroy() error {
wg.Wait()
if C.getRet(resp) == C.RET_OK {
logger.Debug("Successfully destroyed " + n.nodeName)
Debug("Successfully destroyed " + n.nodeName)
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
logger.Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg))
Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg))
return errors.New(errMsg)
}
@ -1290,23 +1277,21 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p
func (n *WakuNode) GetNumConnectedPeers() (int, error) {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to get number of connected peers", zap.Error(err))
Error("Failed to get number of connected peers %v", err)
return 0, err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Fetching number of connected peers for " + n.nodeName)
Debug("Fetching number of connected peers for %v", n.nodeName)
peers, err := n.GetConnectedPeers()
if err != nil {
logger.Error("Failed to fetch connected peers for "+n.nodeName, zap.Error(err))
Error("Failed to fetch connected peers for %v %v ", n.nodeName, err)
return 0, err
}
numPeers := len(peers)
logger.Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers))
Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers))
return numPeers, nil
}
@ -1323,17 +1308,18 @@ func FormatWakuRelayTopic(clusterId uint16, shard uint16) string {
return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard)
}
func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (int, int, error) {
func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) {
if tcpPort == 0 {
for i := 0; i < 10; i++ {
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0"))
if err != nil {
logger.Warn("unable to resolve tcp addr: %v", zap.Error(err))
Warn("unable to resolve tcp addr: %v", zap.Error(err))
continue
}
tcpListener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
logger.Warn("unable to listen on addr", zap.Stringer("addr", tcpAddr), zap.Error(err))
Warn("unable to listen on addr: addr=%v, error=%v", tcpAddr, err)
continue
}
tcpPort = tcpListener.Addr().(*net.TCPAddr).Port
@ -1349,13 +1335,14 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in
for i := 0; i < 10; i++ {
udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("localhost", "0"))
if err != nil {
logger.Warn("unable to resolve udp addr: %v", zap.Error(err))
Warn("unable to resolve udp addr: %v", err)
continue
}
udpListener, err := net.ListenUDP("udp", udpAddr)
if err != nil {
logger.Warn("unable to listen on addr", zap.Stringer("addr", udpAddr), zap.Error(err))
Warn("unable to listen on addr: addr=%v, error=%v", udpAddr, err)
continue
}
@ -1373,10 +1360,8 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int, logger *zap.Logger) (in
// Create & start node
func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
logrusLogger := GetLogger()
logger := logrusToZap(logrusLogger, nodeName) // Convert logrus to zap with node name
logger.Debug("Initializing " + nodeName)
Debug("Initializing %s", nodeName)
var nodeCfg WakuConfig
if customCfg == nil {
@ -1385,105 +1370,96 @@ func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
nodeCfg = *customCfg
}
nodeCfg.Discv5UdpPort = GenerateUniquePort()
nodeCfg.TcpPort = GenerateUniquePort()
logger.Debug("Creating " + nodeName)
node, err := NewWakuNode(&nodeCfg, logger)
Debug("Creating %s", nodeName)
node, err := NewWakuNode(&nodeCfg, nodeName)
if err != nil {
logger.Error("Failed to create "+nodeName, zap.Error(err))
Error("Failed to create %s: %v", nodeName, err)
return nil, err
}
node.nodeName = nodeName
logger.Debug("Starting " + nodeName)
Debug("Starting %s", nodeName)
if err := node.Start(); err != nil {
logger.Error("Failed to start "+nodeName, zap.Error(err))
Error("Failed to start %s: %v", nodeName, err)
return nil, err
}
logger.Debug("Successfully started " + nodeName)
Debug("Successfully started %s", nodeName)
return node, nil
}
func (n *WakuNode) StopAndDestroy() error {
if n == nil {
logger := GetLogger()
err := errors.New("waku node is nil")
logger.Error("Failed to stop and destroy", zap.Error(err))
Error("Failed to stop and destroy: %v", err)
return err
}
logger := n.logger.Named(n.nodeName)
logger.Debug("Stopping " + n.nodeName)
Debug("Stopping %s", n.nodeName)
err := n.Stop()
if err != nil {
logger.Error("Failed to stop "+n.nodeName, zap.Error(err))
Error("Failed to stop %s: %v", n.nodeName, err)
return err
}
logger.Debug("Destroying " + n.nodeName)
Debug("Destroying %s", n.nodeName)
err = n.Destroy()
if err != nil {
logger.Error("Failed to destroy "+n.nodeName, zap.Error(err))
Error("Failed to destroy %s: %v", n.nodeName, err)
return err
}
logger.Debug("Successfully stopped and destroyed " + n.nodeName)
Debug("Successfully stopped and destroyed %s", n.nodeName)
return nil
}
func (n *WakuNode) ConnectPeer(targetNode *WakuNode) error {
logger := n.logger.Named(n.nodeName)
logger.Debug("Connecting " + n.nodeName + " to " + targetNode.nodeName)
Debug("Connecting %s to %s", n.nodeName, targetNode.nodeName)
targetPeerID, err := targetNode.PeerID()
if err != nil {
logger.Error("Failed to get PeerID of target node "+targetNode.nodeName, zap.Error(err))
Error("Failed to get PeerID of target node %s: %v", targetNode.nodeName, err)
return err
}
targetAddr, err := targetNode.ListenAddresses()
if err != nil || len(targetAddr) == 0 {
logger.Error("Failed to get listen addresses for target node "+targetNode.nodeName, zap.Error(err))
Error("Failed to get listen addresses for target node %s: %v", targetNode.nodeName, err)
return errors.New("target node has no listen addresses")
}
logger.Debug("Attempting connection to peer " + targetPeerID.String())
Debug("Attempting connection to peer %s", targetPeerID.String())
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = n.Connect(ctx, targetAddr[0])
if err != nil {
logger.Error("Failed to connect to peer "+targetPeerID.String(), zap.Error(err))
Error("Failed to connect to peer %s: %v", targetPeerID.String(), err)
return err
}
logger.Debug("Successfully connected " + n.nodeName + " to " + targetNode.nodeName)
Debug("Successfully connected %s to %s", n.nodeName, targetNode.nodeName)
return nil
}
func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
logger := n.logger.Named(n.nodeName)
logger.Debug("Disconnecting " + n.nodeName + " from " + target.nodeName)
Debug("Disconnecting %s from %s", n.nodeName, target.nodeName)
targetPeerID, err := target.PeerID()
if err != nil {
logger.Error("Failed to get PeerID of target node "+target.nodeName, zap.Error(err))
Error("Failed to get PeerID of target node %s: %v", target.nodeName, err)
return err
}
err = n.DisconnectPeerByID(targetPeerID)
if err != nil {
logger.Error("Failed to disconnect peer "+targetPeerID.String(), zap.Error(err))
Error("Failed to disconnect peer %s: %v", targetPeerID.String(), err)
return err
}
logger.Debug("Successfully disconnected " + n.nodeName + " from " + target.nodeName)
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
return nil
}

View File

@ -37,9 +37,6 @@ func TestBasicWaku(t *testing.T) {
// ctx := context.Background()
logger, err := zap.NewDevelopment()
require.NoError(t, err)
nwakuConfig := WakuConfig{
Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
Relay: true,
@ -55,7 +52,7 @@ func TestBasicWaku(t *testing.T) {
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)
w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku"))
w, err := NewWakuNode(&nwakuConfig, "nwaku")
require.NoError(t, err)
require.NoError(t, w.Start())
@ -185,15 +182,13 @@ func TestBasicWaku(t *testing.T) {
*/
require.NoError(t, w.Stop())
}
func TestPeerExchange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will be discovered by PeerExchange
discV5NodeWakuConfig := WakuConfig{
Relay: true,
@ -206,7 +201,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: tcpPort,
}
discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node"))
discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, "discV5Node")
require.NoError(t, err)
require.NoError(t, discV5Node.Start())
@ -216,7 +211,7 @@ func TestPeerExchange(t *testing.T) {
discv5NodeEnr, err := discV5Node.ENR()
require.NoError(t, err)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node which serves as PeerExchange server
@ -232,7 +227,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: tcpPort,
}
pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode"))
pxServerNode, err := NewWakuNode(&pxServerWakuConfig, "pxServerNode")
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
@ -265,7 +260,7 @@ func TestPeerExchange(t *testing.T) {
}, options)
require.NoError(t, err)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start light node which uses PeerExchange to discover peers
@ -281,7 +276,7 @@ func TestPeerExchange(t *testing.T) {
PeerExchangeNode: serverNodeMa[0].String(),
}
lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode"))
lightNode, err := NewWakuNode(&pxClientWakuConfig, "lightNode")
require.NoError(t, err)
require.NoError(t, lightNode.Start())
@ -324,10 +319,8 @@ func TestPeerExchange(t *testing.T) {
}
func TestDnsDiscover(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
nameserver := "8.8.8.8"
@ -340,7 +333,7 @@ func TestDnsDiscover(t *testing.T) {
TcpPort: tcpPort,
}
node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node"))
node, err := NewWakuNode(&nodeWakuConfig, "node")
require.NoError(t, err)
require.NoError(t, node.Start())
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
@ -355,10 +348,8 @@ func TestDnsDiscover(t *testing.T) {
}
func TestDial(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will initiate the dial
@ -372,11 +363,11 @@ func TestDial(t *testing.T) {
TcpPort: tcpPort,
}
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode")
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the dial
@ -390,7 +381,7 @@ func TestDial(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -423,10 +414,7 @@ func TestDial(t *testing.T) {
}
func TestRelay(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will send the message
@ -440,11 +428,11 @@ func TestRelay(t *testing.T) {
TcpPort: tcpPort,
}
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode")
require.NoError(t, err)
require.NoError(t, senderNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the message
@ -457,7 +445,7 @@ func TestRelay(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -507,12 +495,10 @@ func TestRelay(t *testing.T) {
}
func TestTopicHealth(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node1
@ -526,11 +512,11 @@ func TestTopicHealth(t *testing.T) {
TcpPort: tcpPort,
}
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
node1, err := NewWakuNode(&wakuConfig1, "node1")
require.NoError(t, err)
require.NoError(t, node1.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node2
@ -543,7 +529,7 @@ func TestTopicHealth(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
node2, err := NewWakuNode(&wakuConfig2, "node2")
require.NoError(t, err)
require.NoError(t, node2.Start())
multiaddr2, err := node2.ListenAddresses()
@ -582,12 +568,10 @@ func TestTopicHealth(t *testing.T) {
}
func TestConnectionChange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node1
@ -601,11 +585,11 @@ func TestConnectionChange(t *testing.T) {
TcpPort: tcpPort,
}
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
node1, err := NewWakuNode(&wakuConfig1, "node1")
require.NoError(t, err)
require.NoError(t, node1.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node2
@ -618,7 +602,7 @@ func TestConnectionChange(t *testing.T) {
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
node2, err := NewWakuNode(&wakuConfig2, "node2")
require.NoError(t, err)
require.NoError(t, node2.Start())
multiaddr2, err := node2.ListenAddresses()
@ -673,10 +657,8 @@ func TestConnectionChange(t *testing.T) {
}
func TestStore(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will send the message
@ -692,11 +674,11 @@ func TestStore(t *testing.T) {
LegacyStore: false,
}
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := NewWakuNode(&senderNodeWakuConfig, "senderNode")
require.NoError(t, err)
require.NoError(t, senderNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will receive the message
@ -711,7 +693,7 @@ func TestStore(t *testing.T) {
TcpPort: tcpPort,
LegacyStore: false,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, "receiverNode")
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
@ -859,7 +841,7 @@ func TestParallelPings(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
// start node that will initiate the dial
@ -873,11 +855,11 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, "dialerNode")
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig1 := WakuConfig{
@ -890,7 +872,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, logger.Named("receiverNode1"))
receiverNode1, err := NewWakuNode(&receiverNodeWakuConfig1, "receiverNode1")
require.NoError(t, err)
require.NoError(t, receiverNode1.Start())
receiverMultiaddr1, err := receiverNode1.ListenAddresses()
@ -898,7 +880,7 @@ func TestParallelPings(t *testing.T) {
require.NotNil(t, receiverMultiaddr1)
require.True(t, len(receiverMultiaddr1) > 0)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig2 := WakuConfig{
@ -911,7 +893,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, logger.Named("receiverNode2"))
receiverNode2, err := NewWakuNode(&receiverNodeWakuConfig2, "receiverNode2")
require.NoError(t, err)
require.NoError(t, receiverNode2.Start())
receiverMultiaddr2, err := receiverNode2.ListenAddresses()
@ -919,7 +901,7 @@ func TestParallelPings(t *testing.T) {
require.NotNil(t, receiverMultiaddr2)
require.True(t, len(receiverMultiaddr2) > 0)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0, logger)
tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
receiverNodeWakuConfig3 := WakuConfig{
@ -932,7 +914,7 @@ func TestParallelPings(t *testing.T) {
TcpPort: tcpPort,
}
receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, logger.Named("receiverNode3"))
receiverNode3, err := NewWakuNode(&receiverNodeWakuConfig3, "receiverNode3")
require.NoError(t, err)
require.NoError(t, receiverNode3.Start())
receiverMultiaddr3, err := receiverNode3.ListenAddresses()

View File

@ -7,13 +7,9 @@ import (
"net/http"
"os"
"strconv"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"golang.org/x/exp/rand"
)
type NwakuInfo struct {
@ -21,31 +17,6 @@ type NwakuInfo struct {
EnrUri string `json:"enrUri"`
}
func logrusToZap(log *logrus.Logger, nodeName string) *zap.Logger {
config := zap.NewDevelopmentConfig()
config.EncoderConfig.TimeKey = "" // Remove timestamp duplication
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
zapLogger, _ := config.Build()
return zapLogger.Named(nodeName)
}
func GenerateUniquePort() int {
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) // Local RNG instance
for {
port := rng.Intn(MaxPort-MinPort+1) + MinPort
portsMutex.Lock()
if !usedPorts[port] {
usedPorts[port] = true
portsMutex.Unlock()
return port
}
portsMutex.Unlock()
}
}
func GetNwakuInfo(host *string, port *int) (NwakuInfo, error) {
nwakuRestPort := 8645
if port != nil {

View File

@ -1,31 +1,39 @@
package waku
import (
"sync"
"time"
)
var DefaultWakuConfig WakuConfig
func init() {
udpPort, _, err1 := GetFreePortIfNeeded(0, 0)
tcpPort, _, err2 := GetFreePortIfNeeded(0, 0)
if err1 != nil || err2 != nil {
Error("Failed to get free ports %v %v", err1, err2)
}
DefaultWakuConfig = WakuConfig{
Relay: false,
LogLevel: "DEBUG",
Discv5Discovery: true,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: false,
Store: false,
Filter: false,
Lightpush: false,
Discv5UdpPort: udpPort,
TcpPort: tcpPort,
}
}
const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
var DefaultPubsubTopic = "/waku/2/rs/16/64"
var (
MinPort = 1024 // Minimum allowable port (exported)
MaxPort = 65535 // Maximum allowable port (exported)
usedPorts = make(map[int]bool) // Tracks used ports (internal to package)
portsMutex sync.Mutex // Ensures thread-safe access to usedPorts
MinPort = 1024 // Minimum allowable port (exported)
MaxPort = 65535 // Maximum allowable port (exported)
)
// Default configuration values
var DefaultWakuConfig = WakuConfig{
Relay: false,
LogLevel: "DEBUG",
Discv5Discovery: true,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: false,
Store: false,
Filter: false,
Lightpush: false,
Discv5UdpPort: GenerateUniquePort(),
TcpPort: GenerateUniquePort(),
}