From b69cc5afa608383544948398872ff56c4122f00a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 9 Jan 2025 16:41:59 +0100 Subject: [PATCH] cleanup --- waku/nwaku.go | 21 ++---------- waku/nwaku_test.go | 82 +++++++++++++++++----------------------------- 2 files changed, 33 insertions(+), 70 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index e0742c2..5deab56 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -317,7 +317,6 @@ import ( "encoding/json" "errors" "fmt" - "runtime" "strconv" "strings" "sync" @@ -439,28 +438,13 @@ type WakuNode struct { TopicHealthChan chan topicHealth } -func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { - ctx, cancel := context.WithCancel(ctx) +func newWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n := &WakuNode{ - cancel: cancel, wakuCfg: config, } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - // defer gocommon.LogOnPanic() - - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - wg.Done() - - <-ctx.Done() - }() - - wg.Wait() jsonConfig, err := json.Marshal(config) if err != nil { @@ -481,10 +465,11 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.logger = logger.Named("nwaku") - wg.Wait() // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 3d26816..3286526 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -50,9 +50,7 @@ func TestBasicWaku(t *testing.T) { storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - w, err := newWakuNode(ctx, &nwakuConfig, nil) + w, err := newWakuNode(&nwakuConfig, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -104,9 +102,9 @@ func TestBasicWaku(t *testing.T) { require.True(t, isDisconnected, "nwaku should be disconnected from the store node") // Re-connect - ctx2, cancel := context.WithTimeout(context.Background(), requestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() - err = w.Connect(ctx2, storeNodeMa) + err = w.Connect(ctx, storeNodeMa) require.NoError(t, err) // Check that we are connected again @@ -200,9 +198,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60010, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - discV5Node, err := newWakuNode(ctx, &discV5NodeWakuConfig, logger.Named("discV5Node")) + discV5Node, err := newWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node")) require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -227,9 +223,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60011, } - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - pxServerNode, err := newWakuNode(ctx2, &pxServerWakuConfig, logger.Named("pxServerNode")) + pxServerNode, err := newWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode")) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -274,9 +268,7 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - ctx3, cancel3 := context.WithCancel(context.Background()) - defer cancel3() - lightNode, err := newWakuNode(ctx3, &pxClientWakuConfig, logger.Named("lightNode")) + lightNode, err := newWakuNode(&pxClientWakuConfig, logger.Named("lightNode")) require.NoError(t, err) require.NoError(t, lightNode.Start()) @@ -333,17 +325,16 @@ func TestDnsDiscover(t *testing.T) { Discv5UdpPort: 9020, TcpPort: 60020, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - node, err := newWakuNode(ctx, &nodeWakuConfig, logger.Named("node")) + + node, err := newWakuNode(&nodeWakuConfig, logger.Named("node")) require.NoError(t, err) require.NoError(t, node.Start()) time.Sleep(1 * time.Second) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" - ctx2, cancel2 := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel2() - res, err := node.DnsDiscovery(ctx2, sampleEnrTree, nameserver) + ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + defer cancel() + res, err := node.DnsDiscovery(ctx, sampleEnrTree, nameserver) require.NoError(t, err) require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query") // Stop nodes @@ -365,12 +356,9 @@ func TestDial(t *testing.T) { TcpPort: 60030, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dialerNode, err := newWakuNode(ctx, &dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := newWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) require.NoError(t, err) require.NoError(t, dialerNode.Start()) - time.Sleep(1 * time.Second) // start node that will receive the dial receiverNodeWakuConfig := WakuConfig{ @@ -382,12 +370,10 @@ func TestDial(t *testing.T) { Discv5UdpPort: 9031, TcpPort: 60031, } - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode")) + + receiverNode, err := newWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) - time.Sleep(1 * time.Second) receiverMultiaddr, err := receiverNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, receiverMultiaddr) @@ -399,9 +385,9 @@ func TestDial(t *testing.T) { require.NoError(t, err) require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial - ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel3() - err = dialerNode.Connect(ctx3, receiverMultiaddr[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = dialerNode.Connect(ctx, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer @@ -431,9 +417,7 @@ func TestRelay(t *testing.T) { TcpPort: 60040, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - senderNode, err := newWakuNode(ctx, &senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := newWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) require.NoError(t, err) require.NoError(t, senderNode.Start()) time.Sleep(1 * time.Second) @@ -448,9 +432,7 @@ func TestRelay(t *testing.T) { Discv5UdpPort: 9041, TcpPort: 60041, } - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := newWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) time.Sleep(1 * time.Second) @@ -459,9 +441,9 @@ func TestRelay(t *testing.T) { require.NotNil(t, receiverMultiaddr) // Dial so they become peers - ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel3() - err = senderNode.Connect(ctx3, receiverMultiaddr[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = senderNode.Connect(ctx, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer @@ -480,9 +462,9 @@ func TestRelay(t *testing.T) { } // send message pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) - ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel4() - senderNode.RelayPublish(ctx4, message, pubsubTopic) + ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel2() + senderNode.RelayPublish(ctx2, message, pubsubTopic) // Wait to receive message select { @@ -516,9 +498,7 @@ func TestTopicHealth(t *testing.T) { TcpPort: 60050, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - node1, err := newWakuNode(ctx, &wakuConfig1, logger.Named("node1")) + node1, err := newWakuNode(&wakuConfig1, logger.Named("node1")) require.NoError(t, err) require.NoError(t, node1.Start()) time.Sleep(1 * time.Second) @@ -533,9 +513,7 @@ func TestTopicHealth(t *testing.T) { Discv5UdpPort: 9051, TcpPort: 60051, } - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - node2, err := newWakuNode(ctx2, &wakuConfig2, logger.Named("node2")) + node2, err := newWakuNode(&wakuConfig2, logger.Named("node2")) require.NoError(t, err) require.NoError(t, node2.Start()) time.Sleep(1 * time.Second) @@ -544,9 +522,9 @@ func TestTopicHealth(t *testing.T) { require.NotNil(t, multiaddr2) // node1 dials node2 so they become peers - ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel3() - err = node1.Connect(ctx3, multiaddr2[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = node1.Connect(ctx, multiaddr2[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer