This commit is contained in:
Gabriel mermelstein 2025-01-09 16:41:59 +01:00
parent 386c84b859
commit b69cc5afa6
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
2 changed files with 33 additions and 70 deletions

View File

@ -317,7 +317,6 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
@ -439,28 +438,13 @@ type WakuNode struct {
TopicHealthChan chan topicHealth
}
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
ctx, cancel := context.WithCancel(ctx)
func newWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
n := &WakuNode{
cancel: cancel,
wakuCfg: config,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// defer gocommon.LogOnPanic()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Done()
<-ctx.Done()
}()
wg.Wait()
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -481,10 +465,11 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
wg.Wait()
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.logger = logger.Named("nwaku")
wg.Wait()
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)

View File

@ -50,9 +50,7 @@ func TestBasicWaku(t *testing.T) {
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w, err := newWakuNode(ctx, &nwakuConfig, nil)
w, err := newWakuNode(&nwakuConfig, nil)
require.NoError(t, err)
require.NoError(t, w.Start())
@ -104,9 +102,9 @@ func TestBasicWaku(t *testing.T) {
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
// Re-connect
ctx2, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = w.Connect(ctx2, storeNodeMa)
err = w.Connect(ctx, storeNodeMa)
require.NoError(t, err)
// Check that we are connected again
@ -200,9 +198,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: 60010,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discV5Node, err := newWakuNode(ctx, &discV5NodeWakuConfig, logger.Named("discV5Node"))
discV5Node, err := newWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node"))
require.NoError(t, err)
require.NoError(t, discV5Node.Start())
@ -227,9 +223,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: 60011,
}
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
pxServerNode, err := newWakuNode(ctx2, &pxServerWakuConfig, logger.Named("pxServerNode"))
pxServerNode, err := newWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode"))
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
@ -274,9 +268,7 @@ func TestPeerExchange(t *testing.T) {
PeerExchangeNode: serverNodeMa[0].String(),
}
ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()
lightNode, err := newWakuNode(ctx3, &pxClientWakuConfig, logger.Named("lightNode"))
lightNode, err := newWakuNode(&pxClientWakuConfig, logger.Named("lightNode"))
require.NoError(t, err)
require.NoError(t, lightNode.Start())
@ -333,17 +325,16 @@ func TestDnsDiscover(t *testing.T) {
Discv5UdpPort: 9020,
TcpPort: 60020,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := newWakuNode(ctx, &nodeWakuConfig, logger.Named("node"))
node, err := newWakuNode(&nodeWakuConfig, logger.Named("node"))
require.NoError(t, err)
require.NoError(t, node.Start())
time.Sleep(1 * time.Second)
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
ctx2, cancel2 := context.WithTimeout(context.TODO(), requestTimeout)
defer cancel2()
res, err := node.DnsDiscovery(ctx2, sampleEnrTree, nameserver)
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
defer cancel()
res, err := node.DnsDiscovery(ctx, sampleEnrTree, nameserver)
require.NoError(t, err)
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
// Stop nodes
@ -365,12 +356,9 @@ func TestDial(t *testing.T) {
TcpPort: 60030,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dialerNode, err := newWakuNode(ctx, &dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := newWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
time.Sleep(1 * time.Second)
// start node that will receive the dial
receiverNodeWakuConfig := WakuConfig{
@ -382,12 +370,10 @@ func TestDial(t *testing.T) {
Discv5UdpPort: 9031,
TcpPort: 60031,
}
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := newWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)
@ -399,9 +385,9 @@ func TestDial(t *testing.T) {
require.NoError(t, err)
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
// Dial
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel3()
err = dialerNode.Connect(ctx3, receiverMultiaddr[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = dialerNode.Connect(ctx, receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
@ -431,9 +417,7 @@ func TestRelay(t *testing.T) {
TcpPort: 60040,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
senderNode, err := newWakuNode(ctx, &senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := newWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
require.NoError(t, err)
require.NoError(t, senderNode.Start())
time.Sleep(1 * time.Second)
@ -448,9 +432,7 @@ func TestRelay(t *testing.T) {
Discv5UdpPort: 9041,
TcpPort: 60041,
}
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := newWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
@ -459,9 +441,9 @@ func TestRelay(t *testing.T) {
require.NotNil(t, receiverMultiaddr)
// Dial so they become peers
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel3()
err = senderNode.Connect(ctx3, receiverMultiaddr[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = senderNode.Connect(ctx, receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
@ -480,9 +462,9 @@ func TestRelay(t *testing.T) {
}
// send message
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel4()
senderNode.RelayPublish(ctx4, message, pubsubTopic)
ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel2()
senderNode.RelayPublish(ctx2, message, pubsubTopic)
// Wait to receive message
select {
@ -516,9 +498,7 @@ func TestTopicHealth(t *testing.T) {
TcpPort: 60050,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node1, err := newWakuNode(ctx, &wakuConfig1, logger.Named("node1"))
node1, err := newWakuNode(&wakuConfig1, logger.Named("node1"))
require.NoError(t, err)
require.NoError(t, node1.Start())
time.Sleep(1 * time.Second)
@ -533,9 +513,7 @@ func TestTopicHealth(t *testing.T) {
Discv5UdpPort: 9051,
TcpPort: 60051,
}
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
node2, err := newWakuNode(ctx2, &wakuConfig2, logger.Named("node2"))
node2, err := newWakuNode(&wakuConfig2, logger.Named("node2"))
require.NoError(t, err)
require.NoError(t, node2.Start())
time.Sleep(1 * time.Second)
@ -544,9 +522,9 @@ func TestTopicHealth(t *testing.T) {
require.NotNil(t, multiaddr2)
// node1 dials node2 so they become peers
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel3()
err = node1.Connect(ctx3, multiaddr2[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = node1.Connect(ctx, multiaddr2[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer