diff --git a/node/status_node_services.go b/node/status_node_services.go index f2716f29d..6136943d6 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -721,6 +721,13 @@ func (b *StatusNode) Cleanup() error { } } + if b.pendingTracker != nil { + err := b.pendingTracker.Stop() + if err != nil { + return err + } + } + return nil } diff --git a/protocol/messenger.go b/protocol/messenger.go index f857836ef..69ddcecb0 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3513,7 +3513,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte statusMessages := handleMessagesResponse.StatusMessages if m.telemetryClient != nil { - m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{ + m.telemetryClient.PushReceivedMessages(m.ctx, telemetry.ReceivedMessages{ Filter: filter, SSHMessage: shhMessage, Messages: statusMessages, diff --git a/services/wallet/onramp/mock/types.go b/services/wallet/onramp/mock/types.go index d00ab03ea..d739cefbb 100644 --- a/services/wallet/onramp/mock/types.go +++ b/services/wallet/onramp/mock/types.go @@ -13,8 +13,9 @@ import ( context "context" reflect "reflect" - onramp "github.com/status-im/status-go/services/wallet/onramp" gomock "go.uber.org/mock/gomock" + + onramp "github.com/status-im/status-go/services/wallet/onramp" ) // MockProvider is a mock of Provider interface. diff --git a/services/wallet/router/pathprocessor/mock_pathprocessor/processor.go b/services/wallet/router/pathprocessor/mock_pathprocessor/processor.go index fd217c338..d4d949844 100644 --- a/services/wallet/router/pathprocessor/mock_pathprocessor/processor.go +++ b/services/wallet/router/pathprocessor/mock_pathprocessor/processor.go @@ -8,9 +8,10 @@ import ( big "math/big" reflect "reflect" + gomock "github.com/golang/mock/gomock" + common "github.com/ethereum/go-ethereum/common" types "github.com/ethereum/go-ethereum/core/types" - gomock "github.com/golang/mock/gomock" account "github.com/status-im/status-go/account" types0 "github.com/status-im/status-go/eth-node/types" pathprocessor "github.com/status-im/status-go/services/wallet/router/pathprocessor" diff --git a/services/wallet/thirdparty/paraswap/mock/types.go b/services/wallet/thirdparty/paraswap/mock/types.go index 32d3fc719..9f3845005 100644 --- a/services/wallet/thirdparty/paraswap/mock/types.go +++ b/services/wallet/thirdparty/paraswap/mock/types.go @@ -10,8 +10,9 @@ import ( big "math/big" reflect "reflect" - common "github.com/ethereum/go-ethereum/common" gomock "github.com/golang/mock/gomock" + + common "github.com/ethereum/go-ethereum/common" paraswap "github.com/status-im/status-go/services/wallet/thirdparty/paraswap" ) diff --git a/telemetry/client.go b/telemetry/client.go index 1aa9f6ba5..85b6112bb 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -42,30 +42,30 @@ type TelemetryRequest struct { TelemetryData *json.RawMessage `json:"telemetry_data"` } -func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { - c.processAndPushTelemetry(receivedMessages) +func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) { + c.processAndPushTelemetry(ctx, receivedMessages) } -func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { - c.processAndPushTelemetry(sentEnvelope) +func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) { + c.processAndPushTelemetry(ctx, sentEnvelope) } -func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { - c.processAndPushTelemetry(receivedEnvelope) +func (c *Client) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *v2protocol.Envelope) { + c.processAndPushTelemetry(ctx, receivedEnvelope) } -func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { - c.processAndPushTelemetry(errorSendingEnvelope) +func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { + c.processAndPushTelemetry(ctx, errorSendingEnvelope) } -func (c *Client) PushPeerCount(peerCount int) { +func (c *Client) PushPeerCount(ctx context.Context, peerCount int) { if peerCount != c.lastPeerCount { c.lastPeerCount = peerCount - c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) + c.processAndPushTelemetry(ctx, PeerCount{PeerCount: peerCount}) } } -func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { +func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) { for peerID, failures := range peerConnFailures { if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { if failures == lastFailures { @@ -73,7 +73,7 @@ func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { } } c.lastPeerConnFailures[peerID] = failures - c.processAndPushTelemetry(PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) + c.processAndPushTelemetry(ctx, PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) } } @@ -154,7 +154,6 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str func (c *Client) Start(ctx context.Context) { go func() { - for { select { case telemetryRequest := <-c.telemetryCh: @@ -199,7 +198,7 @@ func (c *Client) Start(ctx context.Context) { }() } -func (c *Client) processAndPushTelemetry(data interface{}) { +func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) { var telemetryRequest TelemetryRequest switch v := data.(type) { case ReceivedMessages: @@ -243,7 +242,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { return } - c.telemetryCh <- telemetryRequest + select { + case <-ctx.Done(): + return + case c.telemetryCh <- telemetryRequest: + } + c.nextIdLock.Lock() c.nextId++ c.nextIdLock.Unlock() diff --git a/telemetry/client_test.go b/telemetry/client_test.go index dbff6db27..d6482bb8d 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -140,7 +140,7 @@ func TestClient_ProcessReceivedMessages(t *testing.T) { // Send the telemetry request client.Start(ctx) - client.PushReceivedMessages(data) + client.PushReceivedMessages(ctx, data) }) } @@ -156,7 +156,7 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) { // Send the telemetry request client.Start(ctx) - client.PushReceivedEnvelope(envelope) + client.PushReceivedEnvelope(ctx, envelope) }) } @@ -175,7 +175,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) { // Send the telemetry request client.Start(ctx) - client.PushSentEnvelope(sentEnvelope) + client.PushSentEnvelope(ctx, sentEnvelope) }) } @@ -276,11 +276,13 @@ func TestRetryCache(t *testing.T) { })) defer mockServer.Close() + ctx := context.Background() + client := createClient(t, mockServer.URL) - client.Start(context.Background()) + client.Start(ctx) for i := 0; i < 3; i++ { - client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), @@ -292,7 +294,7 @@ func TestRetryCache(t *testing.T) { require.Equal(t, 3, len(client.telemetryRetryCache)) - client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), @@ -307,11 +309,13 @@ func TestRetryCache(t *testing.T) { } func TestRetryCacheCleanup(t *testing.T) { + ctx := context.Background() + client := createClient(t, "") - client.Start(context.Background()) + client.Start(ctx) for i := 0; i < 6000; i++ { - client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), @@ -323,7 +327,7 @@ func TestRetryCacheCleanup(t *testing.T) { require.Equal(t, 6000, len(client.telemetryRetryCache)) - client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), @@ -403,7 +407,7 @@ func TestPeerId(t *testing.T) { withMockServer(t, ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { client.Start(ctx) - client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), diff --git a/transactions/mock_transactor/transactor.go b/transactions/mock_transactor/transactor.go index e927f7997..25cacfeed 100644 --- a/transactions/mock_transactor/transactor.go +++ b/transactions/mock_transactor/transactor.go @@ -8,9 +8,10 @@ import ( big "math/big" reflect "reflect" + gomock "github.com/golang/mock/gomock" + common "github.com/ethereum/go-ethereum/common" types "github.com/ethereum/go-ethereum/core/types" - gomock "github.com/golang/mock/gomock" account "github.com/status-im/status-go/account" types0 "github.com/status-im/status-go/eth-node/types" params "github.com/status-im/status-go/params" diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index afb8b55ae..9fbf44fa8 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -122,9 +122,9 @@ func (w *Waku) broadcast() { fn = func(env *protocol.Envelope, logger *zap.Logger) error { err := sendFn(env, logger) if err == nil { - w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) + w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: env, PublishMethod: publishMethod}) } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) + w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) } return err } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 75da8df8b..fe52b30a2 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -104,11 +104,11 @@ type ErrorSendingEnvelope struct { } type ITelemetryClient interface { - PushReceivedEnvelope(receivedEnvelope *protocol.Envelope) - PushSentEnvelope(sentEnvelope SentEnvelope) - PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) - PushPeerCount(peerCount int) - PushPeerConnFailures(peerConnFailures map[string]int) + PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope) + PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) + PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) + PushPeerCount(ctx context.Context, peerCount int) + PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) } // Waku represents a dark communication interface through the Ethereum @@ -510,6 +510,9 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi } func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { + w.wg.Add(1) + defer w.wg.Done() + if telemetryServerURL == "" { return } @@ -546,7 +549,9 @@ func (w *Waku) GetStats() types.StatsSummary { } func (w *Waku) runPeerExchangeLoop() { + w.wg.Add(1) defer w.wg.Done() + if !w.cfg.EnablePeerExchangeClient { // Currently peer exchange client is only used for light nodes return @@ -1083,8 +1088,7 @@ func (w *Waku) Start() error { } } - w.wg.Add(2) - + w.wg.Add(1) go func() { defer w.wg.Done() @@ -1124,8 +1128,8 @@ func (w *Waku) Start() error { if w.statusTelemetryClient != nil { connFailures := FormatPeerConnFailures(w.node) - w.statusTelemetryClient.PushPeerCount(w.PeerCount()) - w.statusTelemetryClient.PushPeerConnFailures(connFailures) + w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) } w.ConnectionChanged(connection.State{ @@ -1142,7 +1146,6 @@ func (w *Waku) Start() error { go w.runPeerExchangeLoop() if w.cfg.EnableMissingMessageVerification { - w.wg.Add(1) w.missingMsgVerifier = missing.NewMissingMessageVerifier( w.node.Store(), @@ -1152,7 +1155,9 @@ func (w *Waku) Start() error { w.missingMsgVerifier.Start(w.ctx) + w.wg.Add(1) go func() { + w.wg.Done() for { select { case <-w.ctx.Done(): @@ -1194,7 +1199,6 @@ func (w *Waku) Start() error { } // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` - w.wg.Add(1) go w.seedBootnodesForDiscV5() return nil @@ -1302,7 +1306,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag } if w.statusTelemetryClient != nil { - w.statusTelemetryClient.PushReceivedEnvelope(envelope) + w.statusTelemetryClient.PushReceivedEnvelope(w.ctx, envelope) } logger := w.logger.With( @@ -1612,8 +1616,10 @@ func (w *Waku) ConnectionChanged(state connection.State) { // It backs off exponentially until maxRetries, at which point it restarts from 0 // It also restarts if there's a connection change signalled from the client func (w *Waku) seedBootnodesForDiscV5() { + w.wg.Add(1) + defer w.wg.Done() + if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil { - w.wg.Done() return } @@ -1674,7 +1680,6 @@ func (w *Waku) seedBootnodesForDiscV5() { } case <-w.ctx.Done(): - w.wg.Done() w.logger.Debug("bootnode seeding stopped") return }