fix_: cherry pick 5696: block on logout (#5703)

* fix_: block on logout (#5696)

* fix_: check context on processAndPushTelemetry

* fix_: fix waitgroup add/done in Waku

* fix(pendingTracker)_: stop the pending transacitonTracker on logout

* chore_: lint fix

---------

Co-authored-by: Jonathan Rainville <rainville.jonathan@gmail.com>
This commit is contained in:
Igor Sirotin 2024-08-13 20:13:15 +01:00 committed by GitHub
parent 3187ce57bf
commit 67cf830097
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 60 additions and 39 deletions

View File

@ -720,6 +720,13 @@ func (b *StatusNode) Cleanup() error {
} }
} }
if b.pendingTracker != nil {
err := b.pendingTracker.Stop()
if err != nil {
return err
}
}
return nil return nil
} }

View File

@ -3508,7 +3508,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
statusMessages := handleMessagesResponse.StatusMessages statusMessages := handleMessagesResponse.StatusMessages
if m.telemetryClient != nil { if m.telemetryClient != nil {
m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{ m.telemetryClient.PushReceivedMessages(m.ctx, telemetry.ReceivedMessages{
Filter: filter, Filter: filter,
SSHMessage: shhMessage, SSHMessage: shhMessage,
Messages: statusMessages, Messages: statusMessages,

View File

@ -13,8 +13,9 @@ import (
context "context" context "context"
reflect "reflect" reflect "reflect"
onramp "github.com/status-im/status-go/services/wallet/onramp"
gomock "go.uber.org/mock/gomock" gomock "go.uber.org/mock/gomock"
onramp "github.com/status-im/status-go/services/wallet/onramp"
) )
// MockProvider is a mock of Provider interface. // MockProvider is a mock of Provider interface.

View File

@ -10,8 +10,9 @@ import (
big "math/big" big "math/big"
reflect "reflect" reflect "reflect"
common "github.com/ethereum/go-ethereum/common"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
common "github.com/ethereum/go-ethereum/common"
paraswap "github.com/status-im/status-go/services/wallet/thirdparty/paraswap" paraswap "github.com/status-im/status-go/services/wallet/thirdparty/paraswap"
) )

View File

@ -40,24 +40,24 @@ type TelemetryRequest struct {
TelemetryData *json.RawMessage `json:"telemetry_data"` TelemetryData *json.RawMessage `json:"telemetry_data"`
} }
func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) {
c.processAndPushTelemetry(receivedMessages) c.processAndPushTelemetry(ctx, receivedMessages)
} }
func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) {
c.processAndPushTelemetry(sentEnvelope) c.processAndPushTelemetry(ctx, sentEnvelope)
} }
func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { func (c *Client) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *v2protocol.Envelope) {
c.processAndPushTelemetry(receivedEnvelope) c.processAndPushTelemetry(ctx, receivedEnvelope)
} }
func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) {
c.processAndPushTelemetry(errorSendingEnvelope) c.processAndPushTelemetry(ctx, errorSendingEnvelope)
} }
func (c *Client) PushPeerCount(peerCount int) { func (c *Client) PushPeerCount(ctx context.Context, peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) c.processAndPushTelemetry(ctx, PeerCount{PeerCount: peerCount})
} }
type ReceivedMessages struct { type ReceivedMessages struct {
@ -120,7 +120,6 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
func (c *Client) Start(ctx context.Context) { func (c *Client) Start(ctx context.Context) {
go func() { go func() {
for { for {
select { select {
case telemetryRequest := <-c.telemetryCh: case telemetryRequest := <-c.telemetryCh:
@ -165,7 +164,7 @@ func (c *Client) Start(ctx context.Context) {
}() }()
} }
func (c *Client) processAndPushTelemetry(data interface{}) { func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) {
var telemetryRequest TelemetryRequest var telemetryRequest TelemetryRequest
switch v := data.(type) { switch v := data.(type) {
case ReceivedMessages: case ReceivedMessages:
@ -203,7 +202,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
return return
} }
c.telemetryCh <- telemetryRequest select {
case <-ctx.Done():
return
case c.telemetryCh <- telemetryRequest:
}
c.nextIdLock.Lock() c.nextIdLock.Lock()
c.nextId++ c.nextId++
c.nextIdLock.Unlock() c.nextIdLock.Unlock()

View File

@ -140,7 +140,7 @@ func TestClient_ProcessReceivedMessages(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.Start(ctx) client.Start(ctx)
client.PushReceivedMessages(data) client.PushReceivedMessages(ctx, data)
}) })
} }
@ -156,7 +156,7 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.Start(ctx) client.Start(ctx)
client.PushReceivedEnvelope(envelope) client.PushReceivedEnvelope(ctx, envelope)
}) })
} }
@ -175,7 +175,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.Start(ctx) client.Start(ctx)
client.PushSentEnvelope(sentEnvelope) client.PushSentEnvelope(ctx, sentEnvelope)
}) })
} }
@ -276,11 +276,13 @@ func TestRetryCache(t *testing.T) {
})) }))
defer mockServer.Close() defer mockServer.Close()
ctx := context.Background()
client := createClient(t, mockServer.URL) client := createClient(t, mockServer.URL)
client.Start(context.Background()) client.Start(ctx)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic, ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),
@ -292,7 +294,7 @@ func TestRetryCache(t *testing.T) {
require.Equal(t, 3, len(client.telemetryRetryCache)) require.Equal(t, 3, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic, ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),
@ -307,11 +309,13 @@ func TestRetryCache(t *testing.T) {
} }
func TestRetryCacheCleanup(t *testing.T) { func TestRetryCacheCleanup(t *testing.T) {
ctx := context.Background()
client := createClient(t, "") client := createClient(t, "")
client.Start(context.Background()) client.Start(ctx)
for i := 0; i < 6000; i++ { for i := 0; i < 6000; i++ {
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic, ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),
@ -323,7 +327,7 @@ func TestRetryCacheCleanup(t *testing.T) {
require.Equal(t, 6000, len(client.telemetryRetryCache)) require.Equal(t, 6000, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic, ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),

View File

@ -95,6 +95,7 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s
} }
func (w *Waku) checkForMissingMessages() { func (w *Waku) checkForMissingMessages() {
w.wg.Add(1)
defer w.wg.Done() defer w.wg.Done()
defer w.logger.Debug("checkForMissingMessages - done") defer w.logger.Debug("checkForMissingMessages - done")

View File

@ -104,10 +104,10 @@ type ErrorSendingEnvelope struct {
} }
type ITelemetryClient interface { type ITelemetryClient interface {
PushReceivedEnvelope(receivedEnvelope *protocol.Envelope) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope)
PushSentEnvelope(sentEnvelope SentEnvelope) PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int) PushPeerCount(ctx context.Context, peerCount int)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -507,6 +507,9 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi
} }
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
w.wg.Add(1)
defer w.wg.Done()
if telemetryServerURL == "" { if telemetryServerURL == "" {
return return
} }
@ -543,7 +546,9 @@ func (w *Waku) GetStats() types.StatsSummary {
} }
func (w *Waku) runPeerExchangeLoop() { func (w *Waku) runPeerExchangeLoop() {
w.wg.Add(1)
defer w.wg.Done() defer w.wg.Done()
if !w.cfg.EnablePeerExchangeClient { if !w.cfg.EnablePeerExchangeClient {
// Currently peer exchange client is only used for light nodes // Currently peer exchange client is only used for light nodes
return return
@ -1035,9 +1040,9 @@ func (w *Waku) broadcast() {
fn = func(env *protocol.Envelope, logger *zap.Logger) error { fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger) err := sendFn(env, logger)
if err == nil { if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else { } else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
} }
return err return err
} }
@ -1348,8 +1353,7 @@ func (w *Waku) Start() error {
} }
} }
w.wg.Add(2) w.wg.Add(1)
go func() { go func() {
defer w.wg.Done() defer w.wg.Done()
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(5 * time.Second)
@ -1375,7 +1379,6 @@ func (w *Waku) Start() error {
go w.runPeerExchangeLoop() go w.runPeerExchangeLoop()
if w.cfg.EnableMissingMessageVerification { if w.cfg.EnableMissingMessageVerification {
w.wg.Add(1)
go w.checkForMissingMessages() go w.checkForMissingMessages()
} }
@ -1402,7 +1405,6 @@ func (w *Waku) Start() error {
go w.checkIfMessagesStored() go w.checkIfMessagesStored()
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
w.wg.Add(1)
go w.seedBootnodesForDiscV5() go w.seedBootnodesForDiscV5()
return nil return nil
@ -1449,7 +1451,7 @@ func (w *Waku) checkForConnectionChanges() {
} }
if w.statusTelemetryClient != nil { if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushPeerCount(w.PeerCount()) w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
} }
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
@ -1528,7 +1530,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
} }
if w.statusTelemetryClient != nil { if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushReceivedEnvelope(envelope) w.statusTelemetryClient.PushReceivedEnvelope(w.ctx, envelope)
} }
logger := w.logger.With( logger := w.logger.With(
@ -1820,8 +1822,10 @@ func (w *Waku) ConnectionChanged(state connection.State) {
// It backs off exponentially until maxRetries, at which point it restarts from 0 // It backs off exponentially until maxRetries, at which point it restarts from 0
// It also restarts if there's a connection change signalled from the client // It also restarts if there's a connection change signalled from the client
func (w *Waku) seedBootnodesForDiscV5() { func (w *Waku) seedBootnodesForDiscV5() {
w.wg.Add(1)
defer w.wg.Done()
if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil { if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil {
w.wg.Done()
return return
} }
@ -1882,7 +1886,6 @@ func (w *Waku) seedBootnodesForDiscV5() {
} }
case <-w.ctx.Done(): case <-w.ctx.Done():
w.wg.Done()
w.logger.Debug("bootnode seeding stopped") w.logger.Debug("bootnode seeding stopped")
return return
} }