From 2dd9cbec78c9c7fa86d5ad0f06b716203a2eb83e Mon Sep 17 00:00:00 2001 From: Andrey Bocharnikov Date: Tue, 8 Oct 2024 13:46:50 +0400 Subject: [PATCH] fix_: address PR comments --- circuitbreaker/circuit_breaker_test.go | 32 ++--- healthmanager/aggregator/aggregator.go | 6 +- healthmanager/aggregator/aggregator_test.go | 7 +- healthmanager/blockchain_health_manager.go | 109 ++++++++++-------- .../blockchain_health_manager_test.go | 35 ++++-- .../provider_errors/rpc_provider_errors.go | 3 +- healthmanager/providers_health_manager.go | 61 +++++----- .../providers_health_manager_test.go | 25 ++-- healthmanager/rpcstatus/provider_status.go | 10 +- .../rpcstatus/provider_status_test.go | 3 +- node/get_status_node.go | 11 +- rpc/chain/blockchain_health_test.go | 20 ++-- rpc/chain/client_health_test.go | 18 +-- rpc/client.go | 35 ++++-- rpc/client_test.go | 32 ++++- rpc/verif_proxy_test.go | 11 +- services/ens/api_test.go | 10 +- services/wallet/api_test.go | 10 +- services/wallet/history/service_test.go | 11 +- services/wallet/market/market_feed_test.go | 9 +- services/wallet/router/router_test.go | 10 +- services/wallet/token/token_test.go | 12 +- .../transfer/commands_sequential_test.go | 93 ++++++++++++--- services/web3provider/api_test.go | 10 +- transactions/transactor_test.go | 17 ++- 25 files changed, 408 insertions(+), 192 deletions(-) diff --git a/circuitbreaker/circuit_breaker_test.go b/circuitbreaker/circuit_breaker_test.go index 1f7ce820f..c159f26e6 100644 --- a/circuitbreaker/circuit_breaker_test.go +++ b/circuitbreaker/circuit_breaker_test.go @@ -323,11 +323,11 @@ func TestCircuitBreaker_SuccessCallStatus(t *testing.T) { assert.Len(t, result.FunctorCallStatuses(), 1) status := result.FunctorCallStatuses()[0] - if status.name != "successCircuit" { - t.Errorf("Expected functor name to be 'successCircuit', got %s", status.name) + if status.Name != "successCircuit" { + t.Errorf("Expected functor name to be 'successCircuit', got %s", status.Name) } - if status.err != nil { - t.Errorf("Expected no error in functor status, got %v", status.err) + if status.Err != nil { + t.Errorf("Expected no error in functor status, got %v", status.Err) } } @@ -350,11 +350,11 @@ func TestCircuitBreaker_ErrorCallStatus(t *testing.T) { assert.Len(t, result.FunctorCallStatuses(), 1) status := result.FunctorCallStatuses()[0] - if status.name != "errorCircuit" { - t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.name) + if status.Name != "errorCircuit" { + t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.Name) } - if !errors.Is(status.err, expectedError) { - t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.err) + if !errors.Is(status.Err, expectedError) { + t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.Err) } } @@ -405,11 +405,11 @@ func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) { statuses := result.FunctorCallStatuses() require.Len(t, statuses, 2) - require.Equal(t, statuses[0].name, "circuit1") - require.NotNil(t, statuses[0].err) + require.Equal(t, statuses[0].Name, "circuit1") + require.NotNil(t, statuses[0].Err) - require.Equal(t, statuses[1].name, "circuit2") - require.Nil(t, statuses[1].err) + require.Equal(t, statuses[1].Name, "circuit2") + require.Nil(t, statuses[1].Err) } func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) { @@ -444,9 +444,9 @@ func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) { statuses := result.FunctorCallStatuses() require.Len(t, statuses, 2) - require.Equal(t, statuses[0].name, "circuitName") - require.NotNil(t, statuses[0].err) + require.Equal(t, statuses[0].Name, "circuitName") + require.NotNil(t, statuses[0].Err) - require.Equal(t, statuses[1].name, "circuitName") - require.Nil(t, statuses[1].err) + require.Equal(t, statuses[1].Name, "circuitName") + require.Nil(t, statuses[1].Err) } diff --git a/healthmanager/aggregator/aggregator.go b/healthmanager/aggregator/aggregator.go index b81edc01a..ef872a184 100644 --- a/healthmanager/aggregator/aggregator.go +++ b/healthmanager/aggregator/aggregator.go @@ -10,14 +10,14 @@ import ( // Aggregator manages and aggregates the statuses of multiple providers. type Aggregator struct { mu sync.RWMutex - Name string + name string providerStatuses map[string]*rpcstatus.ProviderStatus } // NewAggregator creates a new instance of Aggregator with the given name. func NewAggregator(name string) *Aggregator { return &Aggregator{ - Name: name, + name: name, providerStatuses: make(map[string]*rpcstatus.ProviderStatus), } } @@ -100,7 +100,7 @@ func (a *Aggregator) ComputeAggregatedStatus() rpcstatus.ProviderStatus { } aggregatedStatus := rpcstatus.ProviderStatus{ - Name: a.Name, + Name: a.name, LastSuccessAt: lastSuccessAt, LastErrorAt: lastErrorAt, LastError: lastError, diff --git a/healthmanager/aggregator/aggregator_test.go b/healthmanager/aggregator/aggregator_test.go index 22246e485..87e029bf2 100644 --- a/healthmanager/aggregator/aggregator_test.go +++ b/healthmanager/aggregator/aggregator_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + + "github.com/status-im/status-go/healthmanager/rpcstatus" ) // StatusAggregatorTestSuite defines the test suite for Aggregator. @@ -23,7 +24,7 @@ func (suite *StatusAggregatorTestSuite) SetupTest() { // TestNewAggregator verifies that a new Aggregator is initialized correctly. func (suite *StatusAggregatorTestSuite) TestNewAggregator() { - assert.Equal(suite.T(), "TestAggregator", suite.aggregator.Name, "Aggregator name should be set correctly") + assert.Equal(suite.T(), "TestAggregator", suite.aggregator.name, "Aggregator name should be set correctly") assert.Empty(suite.T(), suite.aggregator.providerStatuses, "Aggregator should have no providers initially") } @@ -94,7 +95,7 @@ func (suite *StatusAggregatorTestSuite) TestUpdate() { func (suite *StatusAggregatorTestSuite) TestComputeAggregatedStatus_NoProviders() { aggStatus := suite.aggregator.ComputeAggregatedStatus() - assert.Equal(suite.T(), rpcstatus.StatusUnknown, aggStatus.Status, "Aggregated status should be 'unknown' when no providers are registered") + assert.Equal(suite.T(), rpcstatus.StatusDown, aggStatus.Status, "Aggregated status should be 'down' when no providers are registered") assert.True(suite.T(), aggStatus.LastSuccessAt.IsZero(), "LastSuccessAt should be zero when no providers are registered") assert.True(suite.T(), aggStatus.LastErrorAt.IsZero(), "LastErrorAt should be zero when no providers are registered") } diff --git a/healthmanager/blockchain_health_manager.go b/healthmanager/blockchain_health_manager.go index 04ba44e23..7fcdf5d6d 100644 --- a/healthmanager/blockchain_health_manager.go +++ b/healthmanager/blockchain_health_manager.go @@ -2,15 +2,16 @@ package healthmanager import ( "context" - "fmt" + "sync" + "github.com/status-im/status-go/healthmanager/aggregator" "github.com/status-im/status-go/healthmanager/rpcstatus" - "sync" ) // BlockchainFullStatus contains the full status of the blockchain, including provider statuses. type BlockchainFullStatus struct { Status rpcstatus.ProviderStatus `json:"status"` + StatusPerChain map[uint64]rpcstatus.ProviderStatus `json:"statusPerChain"` StatusPerChainPerProvider map[uint64]map[string]rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"` } @@ -24,11 +25,11 @@ type BlockchainStatus struct { type BlockchainHealthManager struct { mu sync.RWMutex aggregator *aggregator.Aggregator - subscribers []chan struct{} + subscribers sync.Map // thread-safe providers map[uint64]*ProvidersHealthManager cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions - lastStatus BlockchainStatus + lastStatus *BlockchainStatus wg sync.WaitGroup } @@ -43,30 +44,37 @@ func NewBlockchainHealthManager() *BlockchainHealthManager { } // RegisterProvidersHealthManager registers the provider health manager. -// It prevents registering the same provider twice for the same chain. +// It removes any existing provider for the same chain before registering the new one. func (b *BlockchainHealthManager) RegisterProvidersHealthManager(ctx context.Context, phm *ProvidersHealthManager) error { b.mu.Lock() defer b.mu.Unlock() - // Check if the provider for the given chainID is already registered - if _, exists := b.providers[phm.ChainID()]; exists { - // Log a warning or return an error to indicate that the provider is already registered - return fmt.Errorf("provider for chainID %d is already registered", phm.ChainID()) + chainID := phm.ChainID() + + // Check if a provider for the given chainID is already registered and remove it + if _, exists := b.providers[chainID]; exists { + // Cancel the existing context + if cancel, cancelExists := b.cancelFuncs[chainID]; cancelExists { + cancel() + } + // Remove the old registration + delete(b.providers, chainID) + delete(b.cancelFuncs, chainID) } // Proceed with the registration - b.providers[phm.ChainID()] = phm + b.providers[chainID] = phm // Create a new context for the provider providerCtx, cancel := context.WithCancel(ctx) - b.cancelFuncs[phm.ChainID()] = cancel + b.cancelFuncs[chainID] = cancel statusCh := phm.Subscribe() b.wg.Add(1) go func(phm *ProvidersHealthManager, statusCh chan struct{}, providerCtx context.Context) { defer func() { - b.wg.Done() phm.Unsubscribe(statusCh) + b.wg.Done() }() for { select { @@ -91,6 +99,7 @@ func (b *BlockchainHealthManager) Stop() { cancel() } clear(b.cancelFuncs) + clear(b.providers) b.mu.Unlock() b.wg.Wait() @@ -99,30 +108,31 @@ func (b *BlockchainHealthManager) Stop() { // Subscribe allows clients to receive notifications about changes. func (b *BlockchainHealthManager) Subscribe() chan struct{} { ch := make(chan struct{}, 1) - b.mu.Lock() - defer b.mu.Unlock() - b.subscribers = append(b.subscribers, ch) + b.subscribers.Store(ch, struct{}{}) return ch } // Unsubscribe removes a subscriber from receiving notifications. func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) { - b.mu.Lock() - defer b.mu.Unlock() - - // Remove the subscriber channel from the list - for i, subscriber := range b.subscribers { - if subscriber == ch { - b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...) - close(ch) - break - } - } + b.subscribers.Delete(ch) // Удаляем подписчика из sync.Map + close(ch) } // aggregateAndUpdateStatus collects statuses from all providers and updates the overall and short status. func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context) { + newShortStatus := b.aggregateStatus() + + // If status has changed, update the last status and emit notifications + if b.shouldUpdateStatus(newShortStatus) { + b.updateStatus(newShortStatus) + b.emitBlockchainHealthStatus(ctx) + } +} + +// aggregateStatus aggregates provider statuses and returns the new short status. +func (b *BlockchainHealthManager) aggregateStatus() BlockchainStatus { b.mu.Lock() + defer b.mu.Unlock() // Collect statuses from all providers providerStatuses := make([]rpcstatus.ProviderStatus, 0) @@ -134,16 +144,22 @@ func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context) b.aggregator.UpdateBatch(providerStatuses) // Get the new aggregated full and short status - newShortStatus := b.getShortStatus() - b.mu.Unlock() + return b.getStatusPerChain() +} - // Compare full and short statuses and emit if changed - if !compareShortStatus(newShortStatus, b.lastStatus) { - b.emitBlockchainHealthStatus(ctx) - b.mu.Lock() - defer b.mu.Unlock() - b.lastStatus = newShortStatus - } +// shouldUpdateStatus checks if the status has changed and needs to be updated. +func (b *BlockchainHealthManager) shouldUpdateStatus(newShortStatus BlockchainStatus) bool { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.lastStatus == nil || !compareShortStatus(newShortStatus, *b.lastStatus) +} + +// updateStatus updates the last known status with the new status. +func (b *BlockchainHealthManager) updateStatus(newShortStatus BlockchainStatus) { + b.mu.Lock() + defer b.mu.Unlock() + b.lastStatus = &newShortStatus } // compareShortStatus compares two BlockchainStatus structs and returns true if they are identical. @@ -167,18 +183,18 @@ func compareShortStatus(newStatus, previousStatus BlockchainStatus) bool { // emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status. func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) { - b.mu.RLock() - defer b.mu.RUnlock() - for _, subscriber := range b.subscribers { + b.subscribers.Range(func(key, value interface{}) bool { + subscriber := key.(chan struct{}) select { case <-ctx.Done(): // Stop sending notifications when the context is cancelled - return + return false case subscriber <- struct{}{}: default: - // Skip notification if the subscriber's channel is full + // Skip notification if the subscriber's channel is full (non-blocking) } - } + return true + }) } func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus { @@ -192,15 +208,16 @@ func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus { statusPerChainPerProvider[chainID] = providerStatuses } - blockchainStatus := b.aggregator.GetAggregatedStatus() + statusPerChain := b.getStatusPerChain() return BlockchainFullStatus{ - Status: blockchainStatus, + Status: statusPerChain.Status, + StatusPerChain: statusPerChain.StatusPerChain, StatusPerChainPerProvider: statusPerChainPerProvider, } } -func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus { +func (b *BlockchainHealthManager) getStatusPerChain() BlockchainStatus { statusPerChain := make(map[uint64]rpcstatus.ProviderStatus) for chainID, phm := range b.providers { @@ -216,10 +233,10 @@ func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus { } } -func (b *BlockchainHealthManager) GetShortStatus() BlockchainStatus { +func (b *BlockchainHealthManager) GetStatusPerChain() BlockchainStatus { b.mu.RLock() defer b.mu.RUnlock() - return b.getShortStatus() + return b.getStatusPerChain() } // Status returns the current aggregated status. diff --git a/healthmanager/blockchain_health_manager_test.go b/healthmanager/blockchain_health_manager_test.go index 8b65fcae6..88fdc447c 100644 --- a/healthmanager/blockchain_health_manager_test.go +++ b/healthmanager/blockchain_health_manager_test.go @@ -8,8 +8,9 @@ import ( "testing" "time" - "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/stretchr/testify/suite" + + "github.com/status-im/status-go/healthmanager/rpcstatus" ) type BlockchainHealthManagerSuite struct { @@ -50,7 +51,8 @@ func (s *BlockchainHealthManagerSuite) assertBlockChainStatus(expected rpcstatus // Test registering a provider health manager func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() { phm := NewProvidersHealthManager(1) // Create a real ProvidersHealthManager - s.manager.RegisterProvidersHealthManager(context.Background(), phm) + err := s.manager.RegisterProvidersHealthManager(context.Background(), phm) + s.Require().NoError(err) // Verify that the provider is registered s.Require().NotNil(s.manager.providers[1]) @@ -59,7 +61,8 @@ func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() { // Test status updates and notifications func (s *BlockchainHealthManagerSuite) TestStatusUpdateNotification() { phm := NewProvidersHealthManager(1) - s.manager.RegisterProvidersHealthManager(context.Background(), phm) + err := s.manager.RegisterProvidersHealthManager(context.Background(), phm) + s.Require().NoError(err) ch := s.manager.Subscribe() // Update the provider status @@ -75,8 +78,10 @@ func (s *BlockchainHealthManagerSuite) TestGetFullStatus() { phm1 := NewProvidersHealthManager(1) phm2 := NewProvidersHealthManager(2) ctx := context.Background() - s.manager.RegisterProvidersHealthManager(ctx, phm1) - s.manager.RegisterProvidersHealthManager(ctx, phm2) + err := s.manager.RegisterProvidersHealthManager(ctx, phm1) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(ctx, phm2) + s.Require().NoError(err) ch := s.manager.Subscribe() // Update the provider status @@ -108,8 +113,15 @@ func (s *BlockchainHealthManagerSuite) TestConcurrentSubscriptionUnsubscription( } wg.Wait() + + activeSubscribersCount := 0 + s.manager.subscribers.Range(func(key, value interface{}) bool { + activeSubscribersCount++ + return true + }) + // After all subscribers are removed, there should be no active subscribers - s.Equal(0, len(s.manager.subscribers), "Expected no subscribers after unsubscription") + s.Equal(0, activeSubscribersCount, "Expected no subscribers after unsubscription") } func (s *BlockchainHealthManagerSuite) TestConcurrency() { @@ -120,7 +132,8 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() { defer cancel() for i := 1; i <= chainsCount; i++ { phm := NewProvidersHealthManager(uint64(i)) - s.manager.RegisterProvidersHealthManager(ctx, phm) + err := s.manager.RegisterProvidersHealthManager(ctx, phm) + s.Require().NoError(err) } ch := s.manager.Subscribe() @@ -161,7 +174,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers() // Create an instance of BlockchainHealthManager and register a provider manager phm := NewProvidersHealthManager(1) ctx, cancel := context.WithCancel(s.ctx) - s.manager.RegisterProvidersHealthManager(ctx, phm) + err := s.manager.RegisterProvidersHealthManager(ctx, phm) + s.Require().NoError(err) defer cancel() @@ -196,7 +210,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers() func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() { // Register a provider for chain 1 phm := NewProvidersHealthManager(1) - s.manager.RegisterProvidersHealthManager(s.ctx, phm) + err := s.manager.RegisterProvidersHealthManager(s.ctx, phm) + s.Require().NoError(err) // Subscribe to status updates ch := s.manager.Subscribe() @@ -212,7 +227,7 @@ func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() { s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) // Verify that the short status reflects the chain as down, since one provider is down - shortStatus := s.manager.GetShortStatus() + shortStatus := s.manager.GetStatusPerChain() s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down } diff --git a/healthmanager/provider_errors/rpc_provider_errors.go b/healthmanager/provider_errors/rpc_provider_errors.go index 49f8c3447..7f1c8c90e 100644 --- a/healthmanager/provider_errors/rpc_provider_errors.go +++ b/healthmanager/provider_errors/rpc_provider_errors.go @@ -2,10 +2,11 @@ package provider_errors import ( "errors" + "strings" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/rpc" - "strings" ) type RpcProviderErrorType string diff --git a/healthmanager/providers_health_manager.go b/healthmanager/providers_health_manager.go index 9250132c8..4d1c340e9 100644 --- a/healthmanager/providers_health_manager.go +++ b/healthmanager/providers_health_manager.go @@ -5,24 +5,25 @@ import ( "fmt" "sync" - statusaggregator "github.com/status-im/status-go/healthmanager/aggregator" + "github.com/status-im/status-go/healthmanager/aggregator" "github.com/status-im/status-go/healthmanager/rpcstatus" ) type ProvidersHealthManager struct { mu sync.RWMutex chainID uint64 - aggregator *statusaggregator.Aggregator - subscribers []chan struct{} + aggregator *aggregator.Aggregator + subscribers sync.Map // Use sync.Map for concurrent access to subscribers + lastStatus *rpcstatus.ProviderStatus } // NewProvidersHealthManager creates a new instance of ProvidersHealthManager with the given chain ID. func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager { - aggregator := statusaggregator.NewAggregator(fmt.Sprintf("%d", chainID)) + agg := aggregator.NewAggregator(fmt.Sprintf("%d", chainID)) return &ProvidersHealthManager{ chainID: chainID, - aggregator: aggregator, + aggregator: agg, } } @@ -30,8 +31,6 @@ func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager { func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcstatus.RpcProviderCallStatus) { p.mu.Lock() - previousStatus := p.aggregator.GetAggregatedStatus() - // Update the aggregator with the new provider statuses for _, rpcCallStatus := range callStatuses { providerStatus := rpcstatus.NewRpcProviderStatus(rpcCallStatus) @@ -40,7 +39,7 @@ func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcs newStatus := p.aggregator.GetAggregatedStatus() - shouldEmit := newStatus.Status != previousStatus.Status + shouldEmit := p.lastStatus == nil || p.lastStatus.Status != newStatus.Status p.mu.Unlock() if !shouldEmit { @@ -48,6 +47,9 @@ func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcs } p.emitChainStatus(ctx) + p.mu.Lock() + defer p.mu.Unlock() + p.lastStatus = &newStatus } // GetStatuses returns a copy of the current provider statuses. @@ -59,46 +61,35 @@ func (p *ProvidersHealthManager) GetStatuses() map[string]rpcstatus.ProviderStat // Subscribe allows providers to receive notifications about changes. func (p *ProvidersHealthManager) Subscribe() chan struct{} { - p.mu.Lock() - defer p.mu.Unlock() - ch := make(chan struct{}, 1) - p.subscribers = append(p.subscribers, ch) + p.subscribers.Store(ch, struct{}{}) return ch } // Unsubscribe removes a subscriber from receiving notifications. func (p *ProvidersHealthManager) Unsubscribe(ch chan struct{}) { - p.mu.Lock() - defer p.mu.Unlock() - - for i, subscriber := range p.subscribers { - if subscriber == ch { - p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) - close(ch) - break - } - } + p.subscribers.Delete(ch) + close(ch) } // UnsubscribeAll removes all subscriber channels. func (p *ProvidersHealthManager) UnsubscribeAll() { - p.mu.Lock() - defer p.mu.Unlock() - for _, subscriber := range p.subscribers { - close(subscriber) - } - p.subscribers = nil + p.subscribers.Range(func(key, value interface{}) bool { + ch := key.(chan struct{}) + close(ch) + p.subscribers.Delete(key) + return true + }) } // Reset clears all provider statuses and resets the chain status to unknown. func (p *ProvidersHealthManager) Reset() { p.mu.Lock() defer p.mu.Unlock() - p.aggregator = statusaggregator.NewAggregator(fmt.Sprintf("%d", p.chainID)) + p.aggregator = aggregator.NewAggregator(fmt.Sprintf("%d", p.chainID)) } -// Status Returns the current aggregated status +// Status Returns the current aggregated status. func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus { p.mu.RLock() defer p.mu.RUnlock() @@ -112,15 +103,15 @@ func (p *ProvidersHealthManager) ChainID() uint64 { // emitChainStatus sends a notification to all subscribers. func (p *ProvidersHealthManager) emitChainStatus(ctx context.Context) { - p.mu.RLock() - defer p.mu.RUnlock() - for _, subscriber := range p.subscribers { + p.subscribers.Range(func(key, value interface{}) bool { + subscriber := key.(chan struct{}) select { case subscriber <- struct{}{}: case <-ctx.Done(): - return + return false // Stop sending if context is done default: // Non-blocking send; skip if the channel is full } - } + return true + }) } diff --git a/healthmanager/providers_health_manager_test.go b/healthmanager/providers_health_manager_test.go index 4701e56ec..c40e3228f 100644 --- a/healthmanager/providers_health_manager_test.go +++ b/healthmanager/providers_health_manager_test.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/status-im/status-go/healthmanager/rpcstatus" - "github.com/stretchr/testify/suite" "sync" "testing" "time" + + "github.com/stretchr/testify/suite" + + "github.com/status-im/status-go/healthmanager/rpcstatus" ) type ProvidersHealthManagerSuite struct { @@ -139,9 +141,12 @@ func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() { phm1 := NewProvidersHealthManager(1) phm2 := NewProvidersHealthManager(2) phm3 := NewProvidersHealthManager(3) - s.manager.RegisterProvidersHealthManager(s.ctx, phm1) - s.manager.RegisterProvidersHealthManager(s.ctx, phm2) - s.manager.RegisterProvidersHealthManager(s.ctx, phm3) + err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3) + s.Require().NoError(err) // Subscribe to status updates ch := s.manager.Subscribe() @@ -163,7 +168,7 @@ func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() { s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) // Check that short status correctly reflects the mixed state - shortStatus := s.manager.GetShortStatus() + shortStatus := s.manager.GetStatusPerChain() s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up @@ -174,8 +179,10 @@ func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() { // Register providers for chains 1 and 2 phm1 := NewProvidersHealthManager(1) phm2 := NewProvidersHealthManager(2) - s.manager.RegisterProvidersHealthManager(s.ctx, phm1) - s.manager.RegisterProvidersHealthManager(s.ctx, phm2) + err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) + s.Require().NoError(err) // Subscribe to status updates ch := s.manager.Subscribe() @@ -195,7 +202,7 @@ func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() { s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond) // Check that short status reflects the final state where both chains are down - shortStatus := s.manager.GetShortStatus() + shortStatus := s.manager.GetStatusPerChain() s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status) s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down diff --git a/healthmanager/rpcstatus/provider_status.go b/healthmanager/rpcstatus/provider_status.go index a79b8066b..26aabfc44 100644 --- a/healthmanager/rpcstatus/provider_status.go +++ b/healthmanager/rpcstatus/provider_status.go @@ -17,11 +17,11 @@ const ( // ProviderStatus holds the status information for a single provider. type ProviderStatus struct { - Name string - LastSuccessAt time.Time - LastErrorAt time.Time - LastError error - Status StatusType + Name string `json:"name"` + LastSuccessAt time.Time `json:"last_success_at"` + LastErrorAt time.Time `json:"last_error_at"` + LastError error `json:"last_error"` + Status StatusType `json:"status"` } // ProviderCallStatus represents the result of an arbitrary provider call. diff --git a/healthmanager/rpcstatus/provider_status_test.go b/healthmanager/rpcstatus/provider_status_test.go index c06cef188..3a4f73b80 100644 --- a/healthmanager/rpcstatus/provider_status_test.go +++ b/healthmanager/rpcstatus/provider_status_test.go @@ -2,9 +2,10 @@ package rpcstatus import ( "errors" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "testing" "time" + + "github.com/status-im/status-go/rpc/chain/rpclimiter" ) func TestNewRpcProviderStatus(t *testing.T) { diff --git a/node/get_status_node.go b/node/get_status_node.go index dd4e92265..0e2984684 100644 --- a/node/get_status_node.go +++ b/node/get_status_node.go @@ -1,6 +1,7 @@ package node import ( + "context" "database/sql" "errors" "fmt" @@ -331,7 +332,15 @@ func (n *StatusNode) setupRPCClient() (err error) { }, } - n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.Networks, n.appDB, &n.walletFeed, providerConfigs) + config := rpc.ClientConfig{ + Client: gethNodeClient, + UpstreamChainID: n.config.NetworkID, + Networks: n.config.Networks, + DB: n.appDB, + WalletFeed: &n.walletFeed, + ProviderConfigs: providerConfigs, + } + n.rpcClient, err = rpc.NewClient(config) n.rpcClient.Start(context.Background()) if err != nil { return diff --git a/rpc/chain/blockchain_health_test.go b/rpc/chain/blockchain_health_test.go index 214a850b9..7665ee7f9 100644 --- a/rpc/chain/blockchain_health_test.go +++ b/rpc/chain/blockchain_health_test.go @@ -5,19 +5,22 @@ import ( "encoding/json" "errors" "fmt" - "github.com/status-im/status-go/healthmanager" - "github.com/status-im/status-go/healthmanager/rpcstatus" - mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" "testing" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "github.com/status-im/status-go/healthmanager" + "github.com/status-im/status-go/healthmanager/rpcstatus" + mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/status-im/status-go/rpc/chain/ethclient" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "go.uber.org/mock/gomock" + + "github.com/status-im/status-go/rpc/chain/ethclient" ) type BlockchainHealthManagerSuite struct { @@ -53,7 +56,8 @@ func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) { phm := healthmanager.NewProvidersHealthManager(chainID) client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm) - s.blockchainHealthManager.RegisterProvidersHealthManager(ctx, phm) + err := s.blockchainHealthManager.RegisterProvidersHealthManager(ctx, phm) + require.NoError(s.T(), err) s.mockProviders[chainID] = phm s.mockEthClients[chainID] = mockEthClient @@ -272,7 +276,7 @@ func (s *BlockchainHealthManagerSuite) TestGetShortStatus() { s.waitForStatus(statusCh, rpcstatus.StatusUp) // Get the short status from the BlockchainHealthManager - shortStatus := s.blockchainHealthManager.GetShortStatus() + shortStatus := s.blockchainHealthManager.GetStatusPerChain() // Assert overall blockchain status require.Equal(s.T(), rpcstatus.StatusUp, shortStatus.Status.Status) diff --git a/rpc/chain/client_health_test.go b/rpc/chain/client_health_test.go index e5422baab..ca5ae161b 100644 --- a/rpc/chain/client_health_test.go +++ b/rpc/chain/client_health_test.go @@ -3,19 +3,21 @@ package chain import ( "context" "errors" - "github.com/ethereum/go-ethereum/core/vm" "strconv" "testing" + "github.com/ethereum/go-ethereum/core/vm" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" healthManager "github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/rpclimiter" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/mock/gomock" mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" ) @@ -162,7 +164,7 @@ func (s *ClientWithFallbackSuite) TestVMErrorDoesNotMarkChainDown() { require.Equal(s.T(), providerStatuses["test0"].Status, rpcstatus.StatusUp) } -func (s *ClientWithFallbackSuite) TestNoClientsChainUnknown() { +func (s *ClientWithFallbackSuite) TestNoClientsChainDown() { s.setupClients(0) ctx := context.Background() @@ -174,7 +176,7 @@ func (s *ClientWithFallbackSuite) TestNoClientsChainUnknown() { // THEN chainStatus := s.providersHealthManager.Status() - require.Equal(s.T(), rpcstatus.StatusUnknown, chainStatus.Status) + require.Equal(s.T(), rpcstatus.StatusDown, chainStatus.Status) } func (s *ClientWithFallbackSuite) TestAllClientsDifferentErrors() { @@ -228,11 +230,11 @@ func (s *ClientWithFallbackSuite) TestAllClientsNetworkErrors() { require.Equal(s.T(), providerStatuses["test2"].Status, rpcstatus.StatusDown) } -func (s *ClientWithFallbackSuite) TestChainStatusUnknownWhenAllProvidersUnknown() { +func (s *ClientWithFallbackSuite) TestChainStatusDownWhenInitial() { s.setupClients(2) chainStatus := s.providersHealthManager.Status() - require.Equal(s.T(), rpcstatus.StatusUnknown, chainStatus.Status) + require.Equal(s.T(), rpcstatus.StatusDown, chainStatus.Status) } func TestClientWithFallbackSuite(t *testing.T) { diff --git a/rpc/client.go b/rpc/client.go index 4def86455..da993ae2a 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -121,37 +121,47 @@ type Client struct { // Is initialized in a build-tag-dependent module var verifProxyInitFn func(c *Client) +// ClientConfig holds the configuration for initializing a new Client. +type ClientConfig struct { + Client *gethrpc.Client + UpstreamChainID uint64 + Networks []params.Network + DB *sql.DB + WalletFeed *event.Feed + ProviderConfigs []params.ProviderConfig +} + // NewClient initializes Client // // Client is safe for concurrent use and will automatically // reconnect to the server if connection is lost. -func NewClient(client *gethrpc.Client, upstreamChainID uint64, networks []params.Network, db *sql.DB, walletFeed *event.Feed, providerConfigs []params.ProviderConfig) (*Client, error) { +func NewClient(config ClientConfig) (*Client, error) { var err error log := log.New("package", "status-go/rpc.Client") - networkManager := network.NewManager(db) + networkManager := network.NewManager(config.DB) if networkManager == nil { return nil, errors.New("failed to create network manager") } - err = networkManager.Init(networks) + err = networkManager.Init(config.Networks) if err != nil { log.Error("Network manager failed to initialize", "error", err) } c := Client{ - local: client, + local: config.Client, NetworkManager: networkManager, handlers: make(map[string]Handler), rpcClients: make(map[uint64]chain.ClientInterface), limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter), log: log, - providerConfigs: providerConfigs, + providerConfigs: config.ProviderConfigs, healthMgr: healthmanager.NewBlockchainHealthManager(), - walletFeed: walletFeed, + walletFeed: config.WalletFeed, } - c.UpstreamChainID = upstreamChainID + c.UpstreamChainID = config.UpstreamChainID c.router = newRouter(true) if verifProxyInitFn != nil { @@ -268,9 +278,13 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err } phm := healthmanager.NewProvidersHealthManager(chainID) - c.healthMgr.RegisterProvidersHealthManager(context.Background(), phm) + err := c.healthMgr.RegisterProvidersHealthManager(context.Background(), phm) + if err != nil { + return nil, fmt.Errorf("register providers health manager: %s", err) + } client := chain.NewClient(ethClients, chainID, phm) + client.SetWalletNotifier(c.walletNotifier) c.rpcClients[chainID] = client return client, nil } @@ -439,9 +453,10 @@ func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result in if c.router.routeRemote(method) { client, err := c.getClientUsingCache(chainID) - if err == nil { - return client.CallContext(ctx, result, method, args...) + if err != nil { + return err } + return client.CallContext(ctx, result, method, args...) } if c.local == nil { diff --git a/rpc/client_test.go b/rpc/client_test.go index a0ad79af4..36a3ca1af 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -44,7 +44,15 @@ func TestBlockedRoutesCall(t *testing.T) { gethRPCClient, err := gethrpc.Dial(ts.URL) require.NoError(t, err) - c, err := NewClient(gethRPCClient, 1, []params.Network{}, db, nil, nil) + config := ClientConfig{ + Client: gethRPCClient, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + c, err := NewClient(config) require.NoError(t, err) for _, m := range blockedMethods { @@ -83,7 +91,15 @@ func TestBlockedRoutesRawCall(t *testing.T) { gethRPCClient, err := gethrpc.Dial(ts.URL) require.NoError(t, err) - c, err := NewClient(gethRPCClient, 1, []params.Network{}, db, nil, nil) + config := ClientConfig{ + Client: gethRPCClient, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + c, err := NewClient(config) require.NoError(t, err) for _, m := range blockedMethods { @@ -142,7 +158,17 @@ func TestGetClientsUsingCache(t *testing.T) { DefaultFallbackURL2: server.URL + path3, }, } - c, err := NewClient(nil, 1, networks, db, nil, providerConfigs) + + config := ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: networks, + DB: db, + WalletFeed: nil, + ProviderConfigs: providerConfigs, + } + + c, err := NewClient(config) require.NoError(t, err) // Networks from DB must pick up DefaultRPCURL, DefaultFallbackURL, DefaultFallbackURL2 diff --git a/rpc/verif_proxy_test.go b/rpc/verif_proxy_test.go index 34f774a11..907f334ca 100644 --- a/rpc/verif_proxy_test.go +++ b/rpc/verif_proxy_test.go @@ -48,7 +48,16 @@ func (s *ProxySuite) startRpcClient(infuraURL string) *Client { db, close := setupTestNetworkDB(s.T()) defer close() - c, err := NewClient(gethRPCClient, 1, []params.Network{}, db) + + config := ClientConfig{ + Client: gethRPCClient, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + c, err := NewClient(config) require.NoError(s.T(), err) return c diff --git a/services/ens/api_test.go b/services/ens/api_test.go index 4deba6432..ac3af2466 100644 --- a/services/ens/api_test.go +++ b/services/ens/api_test.go @@ -33,7 +33,15 @@ func setupTestAPI(t *testing.T) (*API, func()) { _ = client - rpcClient, err := statusRPC.NewClient(nil, 1, nil, db, nil) + config := statusRPC.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: nil, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + rpcClient, err := statusRPC.NewClient(config) require.NoError(t, err) // import account keys diff --git a/services/wallet/api_test.go b/services/wallet/api_test.go index b4337bbdd..9b14daa79 100644 --- a/services/wallet/api_test.go +++ b/services/wallet/api_test.go @@ -144,7 +144,15 @@ func TestAPI_GetAddressDetails(t *testing.T) { DefaultFallbackURL: serverWith1SecDelay.URL, }, } - c, err := rpc.NewClient(nil, chainID, networks, appDB, providerConfigs) + config := rpc.ClientConfig{ + Client: nil, + UpstreamChainID: chainID, + Networks: networks, + DB: appDB, + WalletFeed: nil, + ProviderConfigs: providerConfigs, + } + c, err := rpc.NewClient(config) require.NoError(t, err) chainClient, err := c.EthClient(chainID) diff --git a/services/wallet/history/service_test.go b/services/wallet/history/service_test.go index 97c5f4c9a..4bcf9e020 100644 --- a/services/wallet/history/service_test.go +++ b/services/wallet/history/service_test.go @@ -404,7 +404,16 @@ func Test_removeBalanceHistoryOnEventAccountRemoved(t *testing.T) { txServiceMockCtrl := gomock.NewController(t) server, _ := fake.NewTestServer(txServiceMockCtrl) client := gethrpc.DialInProc(server) - rpcClient, _ := rpc.NewClient(client, chainID, nil, appDB, nil) + + config := rpc.ClientConfig{ + Client: client, + UpstreamChainID: chainID, + Networks: nil, + DB: appDB, + WalletFeed: nil, + ProviderConfigs: nil, + } + rpcClient, _ := rpc.NewClient(config) rpcClient.UpstreamChainID = chainID service := NewService(walletDB, accountsDB, &accountFeed, &walletFeed, rpcClient, nil, nil, nil) diff --git a/services/wallet/market/market_feed_test.go b/services/wallet/market/market_feed_test.go index cccdb5930..15697040d 100644 --- a/services/wallet/market/market_feed_test.go +++ b/services/wallet/market/market_feed_test.go @@ -51,7 +51,7 @@ func (s *MarketTestSuite) TestEventOnRpsError() { s.Require().Equal(event.Type, EventMarketStatusChanged) } -func (s *MarketTestSuite) TestNoEventOnNetworkError() { +func (s *MarketTestSuite) TestEventOnNetworkError() { ctrl := gomock.NewController(s.T()) defer ctrl.Finish() @@ -62,10 +62,11 @@ func (s *MarketTestSuite) TestNoEventOnNetworkError() { _, err := manager.FetchPrices(s.symbols, s.currencies) s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError") - _, ok := s.feedSub.WaitForEvent(time.Millisecond * 500) + event, ok := s.feedSub.WaitForEvent(500 * time.Millisecond) + s.Require().True(ok, "expected an event, but none was received") - //THEN - s.Require().False(ok, "expected no event, but one was received") + // THEN + s.Require().Equal(event.Type, EventMarketStatusChanged) } func TestMarketTestSuite(t *testing.T) { diff --git a/services/wallet/router/router_test.go b/services/wallet/router/router_test.go index 8b06ab174..ac36ede5e 100644 --- a/services/wallet/router/router_test.go +++ b/services/wallet/router/router_test.go @@ -91,7 +91,15 @@ func setupTestNetworkDB(t *testing.T) (*sql.DB, func()) { func setupRouter(t *testing.T) (*Router, func()) { db, cleanTmpDb := setupTestNetworkDB(t) - client, _ := rpc.NewClient(nil, 1, defaultNetworks, db, nil) + config := rpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: defaultNetworks, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := rpc.NewClient(config) router := NewRouter(client, nil, nil, nil, nil, nil, nil, nil) diff --git a/services/wallet/token/token_test.go b/services/wallet/token/token_test.go index f8b01c073..b0ea6920f 100644 --- a/services/wallet/token/token_test.go +++ b/services/wallet/token/token_test.go @@ -331,7 +331,17 @@ func Test_removeTokenBalanceOnEventAccountRemoved(t *testing.T) { txServiceMockCtrl := gomock.NewController(t) server, _ := fake.NewTestServer(txServiceMockCtrl) client := gethrpc.DialInProc(server) - rpcClient, _ := rpc.NewClient(client, chainID, nil, appDB, nil) + + config := rpc.ClientConfig{ + Client: client, + UpstreamChainID: chainID, + Networks: nil, + DB: appDB, + WalletFeed: nil, + ProviderConfigs: nil, + } + rpcClient, _ := rpc.NewClient(config) + rpcClient.UpstreamChainID = chainID nm := network.NewManager(appDB) mediaServer, err := mediaserver.NewMediaServer(appDB, nil, nil, walletDB) diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 8da1a4ea5..426d14668 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -10,6 +10,10 @@ import ( "testing" "time" + "github.com/status-im/status-go/contracts" + "github.com/status-im/status-go/services/wallet/blockchainstate" + "github.com/status-im/status-go/t/utils" + "github.com/pkg/errors" "github.com/stretchr/testify/mock" "go.uber.org/mock/gomock" @@ -24,30 +28,26 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/appdatabase" - "github.com/status-im/status-go/contracts" "github.com/status-im/status-go/contracts/balancechecker" "github.com/status-im/status-go/contracts/ethscan" "github.com/status-im/status-go/contracts/ierc20" ethtypes "github.com/status-im/status-go/eth-node/types" - ethclient "github.com/status-im/status-go/rpc/chain/ethclient" - mock_client "github.com/status-im/status-go/rpc/chain/mock/client" - "github.com/status-im/status-go/rpc/chain/rpclimiter" - mock_rpcclient "github.com/status-im/status-go/rpc/mock/client" - "github.com/status-im/status-go/server" - "github.com/status-im/status-go/services/wallet/async" - "github.com/status-im/status-go/services/wallet/balance" - "github.com/status-im/status-go/services/wallet/blockchainstate" - "github.com/status-im/status-go/services/wallet/community" - "github.com/status-im/status-go/t/helpers" - "github.com/status-im/status-go/t/utils" - "github.com/status-im/status-go/multiaccounts/accounts" multicommon "github.com/status-im/status-go/multiaccounts/common" "github.com/status-im/status-go/params" statusRpc "github.com/status-im/status-go/rpc" + ethclient "github.com/status-im/status-go/rpc/chain/ethclient" + mock_client "github.com/status-im/status-go/rpc/chain/mock/client" + "github.com/status-im/status-go/rpc/chain/rpclimiter" + mock_rpcclient "github.com/status-im/status-go/rpc/mock/client" "github.com/status-im/status-go/rpc/network" + "github.com/status-im/status-go/server" + "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/balance" walletcommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/services/wallet/token" + "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/transactions" "github.com/status-im/status-go/walletdatabase" ) @@ -1079,7 +1079,17 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo return nil } - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + client.SetClient(tc.NetworkID(), tc) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager.SetTokens([]*token.Token{ @@ -1342,7 +1352,16 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { currentBlock: 100, } - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + client.SetClient(tc.NetworkID(), tc) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) @@ -1466,7 +1485,16 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) { currentBlock: 100, } - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + client.SetClient(tc.NetworkID(), tc) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) @@ -1546,7 +1574,16 @@ func TestFetchNewBlocksCommand_nonceDetection(t *testing.T) { mediaServer, err := server.NewMediaServer(appdb, nil, nil, db) require.NoError(t, err) - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + client.SetClient(tc.NetworkID(), tc) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) @@ -1660,7 +1697,16 @@ func TestFetchNewBlocksCommand(t *testing.T) { } //tc.printPreparedData = true - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + client.SetClient(tc.NetworkID(), tc) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) @@ -1799,7 +1845,16 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing. db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) require.NoError(t, err) - client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) + config := statusRpc.ClientConfig{ + Client: nil, + UpstreamChainID: 1, + Networks: []params.Network{}, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + client, _ := statusRpc.NewClient(config) + maker, _ := contracts.NewContractMaker(client) wdb := NewDB(db) diff --git a/services/web3provider/api_test.go b/services/web3provider/api_test.go index 22b5b55a3..fe815a212 100644 --- a/services/web3provider/api_test.go +++ b/services/web3provider/api_test.go @@ -39,7 +39,15 @@ func setupTestAPI(t *testing.T) (*API, func()) { server, _ := fake.NewTestServer(txServiceMockCtrl) client := gethrpc.DialInProc(server) - rpcClient, err := statusRPC.NewClient(client, 1, nil, db, nil) + config := statusRPC.ClientConfig{ + Client: client, + UpstreamChainID: 1, + Networks: nil, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + rpcClient, err := statusRPC.NewClient(config) require.NoError(t, err) // import account keys diff --git a/transactions/transactor_test.go b/transactions/transactor_test.go index 7d6252939..b6348023d 100644 --- a/transactions/transactor_test.go +++ b/transactions/transactor_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" + statusRpc "github.com/status-im/status-go/rpc" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" gethtypes "github.com/ethereum/go-ethereum/core/types" @@ -26,7 +28,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/rpc" wallet_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/sqlite" "github.com/status-im/status-go/t/utils" @@ -60,13 +61,23 @@ func (s *TransactorSuite) SetupTest() { chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64() db, err := sqlite.OpenUnecryptedDB(sqlite.InMemoryPath) // dummy to make rpc.Client happy s.Require().NoError(err) - rpcClient, _ := rpc.NewClient(s.client, chainID, nil, db, nil) + + config := statusRpc.ClientConfig{ + Client: s.client, + UpstreamChainID: chainID, + Networks: nil, + DB: db, + WalletFeed: nil, + ProviderConfigs: nil, + } + rpcClient, _ := statusRpc.NewClient(config) + rpcClient.UpstreamChainID = chainID ethClients := []ethclient.RPSLimitedEthClientInterface{ ethclient.NewRPSLimitedEthClient(s.client, rpclimiter.NewRPCRpsLimiter(), "local-1-chain-id-1"), } - localClient := chain.NewClient(ethClients, chainID) + localClient := chain.NewClient(ethClients, chainID, nil) rpcClient.SetClient(chainID, localClient) nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID)