fix_: address PR comments

This commit is contained in:
Andrey Bocharnikov 2024-10-08 13:46:50 +04:00
parent 3c9f9cb9c6
commit 2dd9cbec78
25 changed files with 408 additions and 192 deletions

View File

@ -323,11 +323,11 @@ func TestCircuitBreaker_SuccessCallStatus(t *testing.T) {
assert.Len(t, result.FunctorCallStatuses(), 1) assert.Len(t, result.FunctorCallStatuses(), 1)
status := result.FunctorCallStatuses()[0] status := result.FunctorCallStatuses()[0]
if status.name != "successCircuit" { if status.Name != "successCircuit" {
t.Errorf("Expected functor name to be 'successCircuit', got %s", status.name) t.Errorf("Expected functor name to be 'successCircuit', got %s", status.Name)
} }
if status.err != nil { if status.Err != nil {
t.Errorf("Expected no error in functor status, got %v", status.err) t.Errorf("Expected no error in functor status, got %v", status.Err)
} }
} }
@ -350,11 +350,11 @@ func TestCircuitBreaker_ErrorCallStatus(t *testing.T) {
assert.Len(t, result.FunctorCallStatuses(), 1) assert.Len(t, result.FunctorCallStatuses(), 1)
status := result.FunctorCallStatuses()[0] status := result.FunctorCallStatuses()[0]
if status.name != "errorCircuit" { if status.Name != "errorCircuit" {
t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.name) t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.Name)
} }
if !errors.Is(status.err, expectedError) { if !errors.Is(status.Err, expectedError) {
t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.err) t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.Err)
} }
} }
@ -405,11 +405,11 @@ func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) {
statuses := result.FunctorCallStatuses() statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2) require.Len(t, statuses, 2)
require.Equal(t, statuses[0].name, "circuit1") require.Equal(t, statuses[0].Name, "circuit1")
require.NotNil(t, statuses[0].err) require.NotNil(t, statuses[0].Err)
require.Equal(t, statuses[1].name, "circuit2") require.Equal(t, statuses[1].Name, "circuit2")
require.Nil(t, statuses[1].err) require.Nil(t, statuses[1].Err)
} }
func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) { func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
@ -444,9 +444,9 @@ func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
statuses := result.FunctorCallStatuses() statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2) require.Len(t, statuses, 2)
require.Equal(t, statuses[0].name, "circuitName") require.Equal(t, statuses[0].Name, "circuitName")
require.NotNil(t, statuses[0].err) require.NotNil(t, statuses[0].Err)
require.Equal(t, statuses[1].name, "circuitName") require.Equal(t, statuses[1].Name, "circuitName")
require.Nil(t, statuses[1].err) require.Nil(t, statuses[1].Err)
} }

View File

@ -10,14 +10,14 @@ import (
// Aggregator manages and aggregates the statuses of multiple providers. // Aggregator manages and aggregates the statuses of multiple providers.
type Aggregator struct { type Aggregator struct {
mu sync.RWMutex mu sync.RWMutex
Name string name string
providerStatuses map[string]*rpcstatus.ProviderStatus providerStatuses map[string]*rpcstatus.ProviderStatus
} }
// NewAggregator creates a new instance of Aggregator with the given name. // NewAggregator creates a new instance of Aggregator with the given name.
func NewAggregator(name string) *Aggregator { func NewAggregator(name string) *Aggregator {
return &Aggregator{ return &Aggregator{
Name: name, name: name,
providerStatuses: make(map[string]*rpcstatus.ProviderStatus), providerStatuses: make(map[string]*rpcstatus.ProviderStatus),
} }
} }
@ -100,7 +100,7 @@ func (a *Aggregator) ComputeAggregatedStatus() rpcstatus.ProviderStatus {
} }
aggregatedStatus := rpcstatus.ProviderStatus{ aggregatedStatus := rpcstatus.ProviderStatus{
Name: a.Name, Name: a.name,
LastSuccessAt: lastSuccessAt, LastSuccessAt: lastSuccessAt,
LastErrorAt: lastErrorAt, LastErrorAt: lastErrorAt,
LastError: lastError, LastError: lastError,

View File

@ -5,9 +5,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/status-im/status-go/healthmanager/rpcstatus"
) )
// StatusAggregatorTestSuite defines the test suite for Aggregator. // StatusAggregatorTestSuite defines the test suite for Aggregator.
@ -23,7 +24,7 @@ func (suite *StatusAggregatorTestSuite) SetupTest() {
// TestNewAggregator verifies that a new Aggregator is initialized correctly. // TestNewAggregator verifies that a new Aggregator is initialized correctly.
func (suite *StatusAggregatorTestSuite) TestNewAggregator() { func (suite *StatusAggregatorTestSuite) TestNewAggregator() {
assert.Equal(suite.T(), "TestAggregator", suite.aggregator.Name, "Aggregator name should be set correctly") assert.Equal(suite.T(), "TestAggregator", suite.aggregator.name, "Aggregator name should be set correctly")
assert.Empty(suite.T(), suite.aggregator.providerStatuses, "Aggregator should have no providers initially") assert.Empty(suite.T(), suite.aggregator.providerStatuses, "Aggregator should have no providers initially")
} }
@ -94,7 +95,7 @@ func (suite *StatusAggregatorTestSuite) TestUpdate() {
func (suite *StatusAggregatorTestSuite) TestComputeAggregatedStatus_NoProviders() { func (suite *StatusAggregatorTestSuite) TestComputeAggregatedStatus_NoProviders() {
aggStatus := suite.aggregator.ComputeAggregatedStatus() aggStatus := suite.aggregator.ComputeAggregatedStatus()
assert.Equal(suite.T(), rpcstatus.StatusUnknown, aggStatus.Status, "Aggregated status should be 'unknown' when no providers are registered") assert.Equal(suite.T(), rpcstatus.StatusDown, aggStatus.Status, "Aggregated status should be 'down' when no providers are registered")
assert.True(suite.T(), aggStatus.LastSuccessAt.IsZero(), "LastSuccessAt should be zero when no providers are registered") assert.True(suite.T(), aggStatus.LastSuccessAt.IsZero(), "LastSuccessAt should be zero when no providers are registered")
assert.True(suite.T(), aggStatus.LastErrorAt.IsZero(), "LastErrorAt should be zero when no providers are registered") assert.True(suite.T(), aggStatus.LastErrorAt.IsZero(), "LastErrorAt should be zero when no providers are registered")
} }

View File

@ -2,15 +2,16 @@ package healthmanager
import ( import (
"context" "context"
"fmt" "sync"
"github.com/status-im/status-go/healthmanager/aggregator" "github.com/status-im/status-go/healthmanager/aggregator"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
"sync"
) )
// BlockchainFullStatus contains the full status of the blockchain, including provider statuses. // BlockchainFullStatus contains the full status of the blockchain, including provider statuses.
type BlockchainFullStatus struct { type BlockchainFullStatus struct {
Status rpcstatus.ProviderStatus `json:"status"` Status rpcstatus.ProviderStatus `json:"status"`
StatusPerChain map[uint64]rpcstatus.ProviderStatus `json:"statusPerChain"`
StatusPerChainPerProvider map[uint64]map[string]rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"` StatusPerChainPerProvider map[uint64]map[string]rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"`
} }
@ -24,11 +25,11 @@ type BlockchainStatus struct {
type BlockchainHealthManager struct { type BlockchainHealthManager struct {
mu sync.RWMutex mu sync.RWMutex
aggregator *aggregator.Aggregator aggregator *aggregator.Aggregator
subscribers []chan struct{} subscribers sync.Map // thread-safe
providers map[uint64]*ProvidersHealthManager providers map[uint64]*ProvidersHealthManager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
lastStatus BlockchainStatus lastStatus *BlockchainStatus
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -43,30 +44,37 @@ func NewBlockchainHealthManager() *BlockchainHealthManager {
} }
// RegisterProvidersHealthManager registers the provider health manager. // RegisterProvidersHealthManager registers the provider health manager.
// It prevents registering the same provider twice for the same chain. // It removes any existing provider for the same chain before registering the new one.
func (b *BlockchainHealthManager) RegisterProvidersHealthManager(ctx context.Context, phm *ProvidersHealthManager) error { func (b *BlockchainHealthManager) RegisterProvidersHealthManager(ctx context.Context, phm *ProvidersHealthManager) error {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
// Check if the provider for the given chainID is already registered chainID := phm.ChainID()
if _, exists := b.providers[phm.ChainID()]; exists {
// Log a warning or return an error to indicate that the provider is already registered // Check if a provider for the given chainID is already registered and remove it
return fmt.Errorf("provider for chainID %d is already registered", phm.ChainID()) if _, exists := b.providers[chainID]; exists {
// Cancel the existing context
if cancel, cancelExists := b.cancelFuncs[chainID]; cancelExists {
cancel()
}
// Remove the old registration
delete(b.providers, chainID)
delete(b.cancelFuncs, chainID)
} }
// Proceed with the registration // Proceed with the registration
b.providers[phm.ChainID()] = phm b.providers[chainID] = phm
// Create a new context for the provider // Create a new context for the provider
providerCtx, cancel := context.WithCancel(ctx) providerCtx, cancel := context.WithCancel(ctx)
b.cancelFuncs[phm.ChainID()] = cancel b.cancelFuncs[chainID] = cancel
statusCh := phm.Subscribe() statusCh := phm.Subscribe()
b.wg.Add(1) b.wg.Add(1)
go func(phm *ProvidersHealthManager, statusCh chan struct{}, providerCtx context.Context) { go func(phm *ProvidersHealthManager, statusCh chan struct{}, providerCtx context.Context) {
defer func() { defer func() {
b.wg.Done()
phm.Unsubscribe(statusCh) phm.Unsubscribe(statusCh)
b.wg.Done()
}() }()
for { for {
select { select {
@ -91,6 +99,7 @@ func (b *BlockchainHealthManager) Stop() {
cancel() cancel()
} }
clear(b.cancelFuncs) clear(b.cancelFuncs)
clear(b.providers)
b.mu.Unlock() b.mu.Unlock()
b.wg.Wait() b.wg.Wait()
@ -99,30 +108,31 @@ func (b *BlockchainHealthManager) Stop() {
// Subscribe allows clients to receive notifications about changes. // Subscribe allows clients to receive notifications about changes.
func (b *BlockchainHealthManager) Subscribe() chan struct{} { func (b *BlockchainHealthManager) Subscribe() chan struct{} {
ch := make(chan struct{}, 1) ch := make(chan struct{}, 1)
b.mu.Lock() b.subscribers.Store(ch, struct{}{})
defer b.mu.Unlock()
b.subscribers = append(b.subscribers, ch)
return ch return ch
} }
// Unsubscribe removes a subscriber from receiving notifications. // Unsubscribe removes a subscriber from receiving notifications.
func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) { func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) {
b.mu.Lock() b.subscribers.Delete(ch) // Удаляем подписчика из sync.Map
defer b.mu.Unlock() close(ch)
// Remove the subscriber channel from the list
for i, subscriber := range b.subscribers {
if subscriber == ch {
b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
close(ch)
break
}
}
} }
// aggregateAndUpdateStatus collects statuses from all providers and updates the overall and short status. // aggregateAndUpdateStatus collects statuses from all providers and updates the overall and short status.
func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context) { func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context) {
newShortStatus := b.aggregateStatus()
// If status has changed, update the last status and emit notifications
if b.shouldUpdateStatus(newShortStatus) {
b.updateStatus(newShortStatus)
b.emitBlockchainHealthStatus(ctx)
}
}
// aggregateStatus aggregates provider statuses and returns the new short status.
func (b *BlockchainHealthManager) aggregateStatus() BlockchainStatus {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
// Collect statuses from all providers // Collect statuses from all providers
providerStatuses := make([]rpcstatus.ProviderStatus, 0) providerStatuses := make([]rpcstatus.ProviderStatus, 0)
@ -134,16 +144,22 @@ func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context)
b.aggregator.UpdateBatch(providerStatuses) b.aggregator.UpdateBatch(providerStatuses)
// Get the new aggregated full and short status // Get the new aggregated full and short status
newShortStatus := b.getShortStatus() return b.getStatusPerChain()
b.mu.Unlock() }
// Compare full and short statuses and emit if changed // shouldUpdateStatus checks if the status has changed and needs to be updated.
if !compareShortStatus(newShortStatus, b.lastStatus) { func (b *BlockchainHealthManager) shouldUpdateStatus(newShortStatus BlockchainStatus) bool {
b.emitBlockchainHealthStatus(ctx) b.mu.RLock()
b.mu.Lock() defer b.mu.RUnlock()
defer b.mu.Unlock()
b.lastStatus = newShortStatus return b.lastStatus == nil || !compareShortStatus(newShortStatus, *b.lastStatus)
} }
// updateStatus updates the last known status with the new status.
func (b *BlockchainHealthManager) updateStatus(newShortStatus BlockchainStatus) {
b.mu.Lock()
defer b.mu.Unlock()
b.lastStatus = &newShortStatus
} }
// compareShortStatus compares two BlockchainStatus structs and returns true if they are identical. // compareShortStatus compares two BlockchainStatus structs and returns true if they are identical.
@ -167,18 +183,18 @@ func compareShortStatus(newStatus, previousStatus BlockchainStatus) bool {
// emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status. // emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status.
func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) { func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) {
b.mu.RLock() b.subscribers.Range(func(key, value interface{}) bool {
defer b.mu.RUnlock() subscriber := key.(chan struct{})
for _, subscriber := range b.subscribers {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Stop sending notifications when the context is cancelled // Stop sending notifications when the context is cancelled
return return false
case subscriber <- struct{}{}: case subscriber <- struct{}{}:
default: default:
// Skip notification if the subscriber's channel is full // Skip notification if the subscriber's channel is full (non-blocking)
} }
} return true
})
} }
func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus { func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus {
@ -192,15 +208,16 @@ func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus {
statusPerChainPerProvider[chainID] = providerStatuses statusPerChainPerProvider[chainID] = providerStatuses
} }
blockchainStatus := b.aggregator.GetAggregatedStatus() statusPerChain := b.getStatusPerChain()
return BlockchainFullStatus{ return BlockchainFullStatus{
Status: blockchainStatus, Status: statusPerChain.Status,
StatusPerChain: statusPerChain.StatusPerChain,
StatusPerChainPerProvider: statusPerChainPerProvider, StatusPerChainPerProvider: statusPerChainPerProvider,
} }
} }
func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus { func (b *BlockchainHealthManager) getStatusPerChain() BlockchainStatus {
statusPerChain := make(map[uint64]rpcstatus.ProviderStatus) statusPerChain := make(map[uint64]rpcstatus.ProviderStatus)
for chainID, phm := range b.providers { for chainID, phm := range b.providers {
@ -216,10 +233,10 @@ func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus {
} }
} }
func (b *BlockchainHealthManager) GetShortStatus() BlockchainStatus { func (b *BlockchainHealthManager) GetStatusPerChain() BlockchainStatus {
b.mu.RLock() b.mu.RLock()
defer b.mu.RUnlock() defer b.mu.RUnlock()
return b.getShortStatus() return b.getStatusPerChain()
} }
// Status returns the current aggregated status. // Status returns the current aggregated status.

View File

@ -8,8 +8,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/status-im/status-go/healthmanager/rpcstatus"
) )
type BlockchainHealthManagerSuite struct { type BlockchainHealthManagerSuite struct {
@ -50,7 +51,8 @@ func (s *BlockchainHealthManagerSuite) assertBlockChainStatus(expected rpcstatus
// Test registering a provider health manager // Test registering a provider health manager
func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() { func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() {
phm := NewProvidersHealthManager(1) // Create a real ProvidersHealthManager phm := NewProvidersHealthManager(1) // Create a real ProvidersHealthManager
s.manager.RegisterProvidersHealthManager(context.Background(), phm) err := s.manager.RegisterProvidersHealthManager(context.Background(), phm)
s.Require().NoError(err)
// Verify that the provider is registered // Verify that the provider is registered
s.Require().NotNil(s.manager.providers[1]) s.Require().NotNil(s.manager.providers[1])
@ -59,7 +61,8 @@ func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() {
// Test status updates and notifications // Test status updates and notifications
func (s *BlockchainHealthManagerSuite) TestStatusUpdateNotification() { func (s *BlockchainHealthManagerSuite) TestStatusUpdateNotification() {
phm := NewProvidersHealthManager(1) phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(context.Background(), phm) err := s.manager.RegisterProvidersHealthManager(context.Background(), phm)
s.Require().NoError(err)
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
// Update the provider status // Update the provider status
@ -75,8 +78,10 @@ func (s *BlockchainHealthManagerSuite) TestGetFullStatus() {
phm1 := NewProvidersHealthManager(1) phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2) phm2 := NewProvidersHealthManager(2)
ctx := context.Background() ctx := context.Background()
s.manager.RegisterProvidersHealthManager(ctx, phm1) err := s.manager.RegisterProvidersHealthManager(ctx, phm1)
s.manager.RegisterProvidersHealthManager(ctx, phm2) s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(ctx, phm2)
s.Require().NoError(err)
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
// Update the provider status // Update the provider status
@ -108,8 +113,15 @@ func (s *BlockchainHealthManagerSuite) TestConcurrentSubscriptionUnsubscription(
} }
wg.Wait() wg.Wait()
activeSubscribersCount := 0
s.manager.subscribers.Range(func(key, value interface{}) bool {
activeSubscribersCount++
return true
})
// After all subscribers are removed, there should be no active subscribers // After all subscribers are removed, there should be no active subscribers
s.Equal(0, len(s.manager.subscribers), "Expected no subscribers after unsubscription") s.Equal(0, activeSubscribersCount, "Expected no subscribers after unsubscription")
} }
func (s *BlockchainHealthManagerSuite) TestConcurrency() { func (s *BlockchainHealthManagerSuite) TestConcurrency() {
@ -120,7 +132,8 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() {
defer cancel() defer cancel()
for i := 1; i <= chainsCount; i++ { for i := 1; i <= chainsCount; i++ {
phm := NewProvidersHealthManager(uint64(i)) phm := NewProvidersHealthManager(uint64(i))
s.manager.RegisterProvidersHealthManager(ctx, phm) err := s.manager.RegisterProvidersHealthManager(ctx, phm)
s.Require().NoError(err)
} }
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
@ -161,7 +174,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers()
// Create an instance of BlockchainHealthManager and register a provider manager // Create an instance of BlockchainHealthManager and register a provider manager
phm := NewProvidersHealthManager(1) phm := NewProvidersHealthManager(1)
ctx, cancel := context.WithCancel(s.ctx) ctx, cancel := context.WithCancel(s.ctx)
s.manager.RegisterProvidersHealthManager(ctx, phm) err := s.manager.RegisterProvidersHealthManager(ctx, phm)
s.Require().NoError(err)
defer cancel() defer cancel()
@ -196,7 +210,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers()
func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() { func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() {
// Register a provider for chain 1 // Register a provider for chain 1
phm := NewProvidersHealthManager(1) phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(s.ctx, phm) err := s.manager.RegisterProvidersHealthManager(s.ctx, phm)
s.Require().NoError(err)
// Subscribe to status updates // Subscribe to status updates
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
@ -212,7 +227,7 @@ func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() {
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)
// Verify that the short status reflects the chain as down, since one provider is down // Verify that the short status reflects the chain as down, since one provider is down
shortStatus := s.manager.GetShortStatus() shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down
} }

View File

@ -2,10 +2,11 @@ package provider_errors
import ( import (
"errors" "errors"
"strings"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"strings"
) )
type RpcProviderErrorType string type RpcProviderErrorType string

View File

@ -5,24 +5,25 @@ import (
"fmt" "fmt"
"sync" "sync"
statusaggregator "github.com/status-im/status-go/healthmanager/aggregator" "github.com/status-im/status-go/healthmanager/aggregator"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
) )
type ProvidersHealthManager struct { type ProvidersHealthManager struct {
mu sync.RWMutex mu sync.RWMutex
chainID uint64 chainID uint64
aggregator *statusaggregator.Aggregator aggregator *aggregator.Aggregator
subscribers []chan struct{} subscribers sync.Map // Use sync.Map for concurrent access to subscribers
lastStatus *rpcstatus.ProviderStatus
} }
// NewProvidersHealthManager creates a new instance of ProvidersHealthManager with the given chain ID. // NewProvidersHealthManager creates a new instance of ProvidersHealthManager with the given chain ID.
func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager { func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager {
aggregator := statusaggregator.NewAggregator(fmt.Sprintf("%d", chainID)) agg := aggregator.NewAggregator(fmt.Sprintf("%d", chainID))
return &ProvidersHealthManager{ return &ProvidersHealthManager{
chainID: chainID, chainID: chainID,
aggregator: aggregator, aggregator: agg,
} }
} }
@ -30,8 +31,6 @@ func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager {
func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcstatus.RpcProviderCallStatus) { func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcstatus.RpcProviderCallStatus) {
p.mu.Lock() p.mu.Lock()
previousStatus := p.aggregator.GetAggregatedStatus()
// Update the aggregator with the new provider statuses // Update the aggregator with the new provider statuses
for _, rpcCallStatus := range callStatuses { for _, rpcCallStatus := range callStatuses {
providerStatus := rpcstatus.NewRpcProviderStatus(rpcCallStatus) providerStatus := rpcstatus.NewRpcProviderStatus(rpcCallStatus)
@ -40,7 +39,7 @@ func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcs
newStatus := p.aggregator.GetAggregatedStatus() newStatus := p.aggregator.GetAggregatedStatus()
shouldEmit := newStatus.Status != previousStatus.Status shouldEmit := p.lastStatus == nil || p.lastStatus.Status != newStatus.Status
p.mu.Unlock() p.mu.Unlock()
if !shouldEmit { if !shouldEmit {
@ -48,6 +47,9 @@ func (p *ProvidersHealthManager) Update(ctx context.Context, callStatuses []rpcs
} }
p.emitChainStatus(ctx) p.emitChainStatus(ctx)
p.mu.Lock()
defer p.mu.Unlock()
p.lastStatus = &newStatus
} }
// GetStatuses returns a copy of the current provider statuses. // GetStatuses returns a copy of the current provider statuses.
@ -59,46 +61,35 @@ func (p *ProvidersHealthManager) GetStatuses() map[string]rpcstatus.ProviderStat
// Subscribe allows providers to receive notifications about changes. // Subscribe allows providers to receive notifications about changes.
func (p *ProvidersHealthManager) Subscribe() chan struct{} { func (p *ProvidersHealthManager) Subscribe() chan struct{} {
p.mu.Lock()
defer p.mu.Unlock()
ch := make(chan struct{}, 1) ch := make(chan struct{}, 1)
p.subscribers = append(p.subscribers, ch) p.subscribers.Store(ch, struct{}{})
return ch return ch
} }
// Unsubscribe removes a subscriber from receiving notifications. // Unsubscribe removes a subscriber from receiving notifications.
func (p *ProvidersHealthManager) Unsubscribe(ch chan struct{}) { func (p *ProvidersHealthManager) Unsubscribe(ch chan struct{}) {
p.mu.Lock() p.subscribers.Delete(ch)
defer p.mu.Unlock() close(ch)
for i, subscriber := range p.subscribers {
if subscriber == ch {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
close(ch)
break
}
}
} }
// UnsubscribeAll removes all subscriber channels. // UnsubscribeAll removes all subscriber channels.
func (p *ProvidersHealthManager) UnsubscribeAll() { func (p *ProvidersHealthManager) UnsubscribeAll() {
p.mu.Lock() p.subscribers.Range(func(key, value interface{}) bool {
defer p.mu.Unlock() ch := key.(chan struct{})
for _, subscriber := range p.subscribers { close(ch)
close(subscriber) p.subscribers.Delete(key)
} return true
p.subscribers = nil })
} }
// Reset clears all provider statuses and resets the chain status to unknown. // Reset clears all provider statuses and resets the chain status to unknown.
func (p *ProvidersHealthManager) Reset() { func (p *ProvidersHealthManager) Reset() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
p.aggregator = statusaggregator.NewAggregator(fmt.Sprintf("%d", p.chainID)) p.aggregator = aggregator.NewAggregator(fmt.Sprintf("%d", p.chainID))
} }
// Status Returns the current aggregated status // Status Returns the current aggregated status.
func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus { func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
@ -112,15 +103,15 @@ func (p *ProvidersHealthManager) ChainID() uint64 {
// emitChainStatus sends a notification to all subscribers. // emitChainStatus sends a notification to all subscribers.
func (p *ProvidersHealthManager) emitChainStatus(ctx context.Context) { func (p *ProvidersHealthManager) emitChainStatus(ctx context.Context) {
p.mu.RLock() p.subscribers.Range(func(key, value interface{}) bool {
defer p.mu.RUnlock() subscriber := key.(chan struct{})
for _, subscriber := range p.subscribers {
select { select {
case subscriber <- struct{}{}: case subscriber <- struct{}{}:
case <-ctx.Done(): case <-ctx.Done():
return return false // Stop sending if context is done
default: default:
// Non-blocking send; skip if the channel is full // Non-blocking send; skip if the channel is full
} }
} return true
})
} }

View File

@ -4,11 +4,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/stretchr/testify/suite"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/healthmanager/rpcstatus"
) )
type ProvidersHealthManagerSuite struct { type ProvidersHealthManagerSuite struct {
@ -139,9 +141,12 @@ func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() {
phm1 := NewProvidersHealthManager(1) phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2) phm2 := NewProvidersHealthManager(2)
phm3 := NewProvidersHealthManager(3) phm3 := NewProvidersHealthManager(3)
s.manager.RegisterProvidersHealthManager(s.ctx, phm1) err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.manager.RegisterProvidersHealthManager(s.ctx, phm2) s.Require().NoError(err)
s.manager.RegisterProvidersHealthManager(s.ctx, phm3) err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3)
s.Require().NoError(err)
// Subscribe to status updates // Subscribe to status updates
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
@ -163,7 +168,7 @@ func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() {
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)
// Check that short status correctly reflects the mixed state // Check that short status correctly reflects the mixed state
shortStatus := s.manager.GetShortStatus() shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up
@ -174,8 +179,10 @@ func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() {
// Register providers for chains 1 and 2 // Register providers for chains 1 and 2
phm1 := NewProvidersHealthManager(1) phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2) phm2 := NewProvidersHealthManager(2)
s.manager.RegisterProvidersHealthManager(s.ctx, phm1) err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.manager.RegisterProvidersHealthManager(s.ctx, phm2) s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)
// Subscribe to status updates // Subscribe to status updates
ch := s.manager.Subscribe() ch := s.manager.Subscribe()
@ -195,7 +202,7 @@ func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() {
s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond) s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond)
// Check that short status reflects the final state where both chains are down // Check that short status reflects the final state where both chains are down
shortStatus := s.manager.GetShortStatus() shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status) s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down

View File

@ -17,11 +17,11 @@ const (
// ProviderStatus holds the status information for a single provider. // ProviderStatus holds the status information for a single provider.
type ProviderStatus struct { type ProviderStatus struct {
Name string Name string `json:"name"`
LastSuccessAt time.Time LastSuccessAt time.Time `json:"last_success_at"`
LastErrorAt time.Time LastErrorAt time.Time `json:"last_error_at"`
LastError error LastError error `json:"last_error"`
Status StatusType Status StatusType `json:"status"`
} }
// ProviderCallStatus represents the result of an arbitrary provider call. // ProviderCallStatus represents the result of an arbitrary provider call.

View File

@ -2,9 +2,10 @@ package rpcstatus
import ( import (
"errors" "errors"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"testing" "testing"
"time" "time"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
) )
func TestNewRpcProviderStatus(t *testing.T) { func TestNewRpcProviderStatus(t *testing.T) {

View File

@ -1,6 +1,7 @@
package node package node
import ( import (
"context"
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
@ -331,7 +332,15 @@ func (n *StatusNode) setupRPCClient() (err error) {
}, },
} }
n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.Networks, n.appDB, &n.walletFeed, providerConfigs) config := rpc.ClientConfig{
Client: gethNodeClient,
UpstreamChainID: n.config.NetworkID,
Networks: n.config.Networks,
DB: n.appDB,
WalletFeed: &n.walletFeed,
ProviderConfigs: providerConfigs,
}
n.rpcClient, err = rpc.NewClient(config)
n.rpcClient.Start(context.Background()) n.rpcClient.Start(context.Background())
if err != nil { if err != nil {
return return

View File

@ -5,19 +5,22 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/rpcstatus"
mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient"
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/healthmanager"
"github.com/ethereum/go-ethereum/core/types" "github.com/status-im/status-go/healthmanager/rpcstatus"
mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
"github.com/status-im/status-go/rpc/chain/ethclient"
) )
type BlockchainHealthManagerSuite struct { type BlockchainHealthManagerSuite struct {
@ -53,7 +56,8 @@ func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) {
phm := healthmanager.NewProvidersHealthManager(chainID) phm := healthmanager.NewProvidersHealthManager(chainID)
client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm) client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm)
s.blockchainHealthManager.RegisterProvidersHealthManager(ctx, phm) err := s.blockchainHealthManager.RegisterProvidersHealthManager(ctx, phm)
require.NoError(s.T(), err)
s.mockProviders[chainID] = phm s.mockProviders[chainID] = phm
s.mockEthClients[chainID] = mockEthClient s.mockEthClients[chainID] = mockEthClient
@ -272,7 +276,7 @@ func (s *BlockchainHealthManagerSuite) TestGetShortStatus() {
s.waitForStatus(statusCh, rpcstatus.StatusUp) s.waitForStatus(statusCh, rpcstatus.StatusUp)
// Get the short status from the BlockchainHealthManager // Get the short status from the BlockchainHealthManager
shortStatus := s.blockchainHealthManager.GetShortStatus() shortStatus := s.blockchainHealthManager.GetStatusPerChain()
// Assert overall blockchain status // Assert overall blockchain status
require.Equal(s.T(), rpcstatus.StatusUp, shortStatus.Status.Status) require.Equal(s.T(), rpcstatus.StatusUp, shortStatus.Status.Status)

View File

@ -3,19 +3,21 @@ package chain
import ( import (
"context" "context"
"errors" "errors"
"github.com/ethereum/go-ethereum/core/vm"
"strconv" "strconv"
"testing" "testing"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
healthManager "github.com/status-im/status-go/healthmanager" healthManager "github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient"
) )
@ -162,7 +164,7 @@ func (s *ClientWithFallbackSuite) TestVMErrorDoesNotMarkChainDown() {
require.Equal(s.T(), providerStatuses["test0"].Status, rpcstatus.StatusUp) require.Equal(s.T(), providerStatuses["test0"].Status, rpcstatus.StatusUp)
} }
func (s *ClientWithFallbackSuite) TestNoClientsChainUnknown() { func (s *ClientWithFallbackSuite) TestNoClientsChainDown() {
s.setupClients(0) s.setupClients(0)
ctx := context.Background() ctx := context.Background()
@ -174,7 +176,7 @@ func (s *ClientWithFallbackSuite) TestNoClientsChainUnknown() {
// THEN // THEN
chainStatus := s.providersHealthManager.Status() chainStatus := s.providersHealthManager.Status()
require.Equal(s.T(), rpcstatus.StatusUnknown, chainStatus.Status) require.Equal(s.T(), rpcstatus.StatusDown, chainStatus.Status)
} }
func (s *ClientWithFallbackSuite) TestAllClientsDifferentErrors() { func (s *ClientWithFallbackSuite) TestAllClientsDifferentErrors() {
@ -228,11 +230,11 @@ func (s *ClientWithFallbackSuite) TestAllClientsNetworkErrors() {
require.Equal(s.T(), providerStatuses["test2"].Status, rpcstatus.StatusDown) require.Equal(s.T(), providerStatuses["test2"].Status, rpcstatus.StatusDown)
} }
func (s *ClientWithFallbackSuite) TestChainStatusUnknownWhenAllProvidersUnknown() { func (s *ClientWithFallbackSuite) TestChainStatusDownWhenInitial() {
s.setupClients(2) s.setupClients(2)
chainStatus := s.providersHealthManager.Status() chainStatus := s.providersHealthManager.Status()
require.Equal(s.T(), rpcstatus.StatusUnknown, chainStatus.Status) require.Equal(s.T(), rpcstatus.StatusDown, chainStatus.Status)
} }
func TestClientWithFallbackSuite(t *testing.T) { func TestClientWithFallbackSuite(t *testing.T) {

View File

@ -121,37 +121,47 @@ type Client struct {
// Is initialized in a build-tag-dependent module // Is initialized in a build-tag-dependent module
var verifProxyInitFn func(c *Client) var verifProxyInitFn func(c *Client)
// ClientConfig holds the configuration for initializing a new Client.
type ClientConfig struct {
Client *gethrpc.Client
UpstreamChainID uint64
Networks []params.Network
DB *sql.DB
WalletFeed *event.Feed
ProviderConfigs []params.ProviderConfig
}
// NewClient initializes Client // NewClient initializes Client
// //
// Client is safe for concurrent use and will automatically // Client is safe for concurrent use and will automatically
// reconnect to the server if connection is lost. // reconnect to the server if connection is lost.
func NewClient(client *gethrpc.Client, upstreamChainID uint64, networks []params.Network, db *sql.DB, walletFeed *event.Feed, providerConfigs []params.ProviderConfig) (*Client, error) { func NewClient(config ClientConfig) (*Client, error) {
var err error var err error
log := log.New("package", "status-go/rpc.Client") log := log.New("package", "status-go/rpc.Client")
networkManager := network.NewManager(db) networkManager := network.NewManager(config.DB)
if networkManager == nil { if networkManager == nil {
return nil, errors.New("failed to create network manager") return nil, errors.New("failed to create network manager")
} }
err = networkManager.Init(networks) err = networkManager.Init(config.Networks)
if err != nil { if err != nil {
log.Error("Network manager failed to initialize", "error", err) log.Error("Network manager failed to initialize", "error", err)
} }
c := Client{ c := Client{
local: client, local: config.Client,
NetworkManager: networkManager, NetworkManager: networkManager,
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface), rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter), limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter),
log: log, log: log,
providerConfigs: providerConfigs, providerConfigs: config.ProviderConfigs,
healthMgr: healthmanager.NewBlockchainHealthManager(), healthMgr: healthmanager.NewBlockchainHealthManager(),
walletFeed: walletFeed, walletFeed: config.WalletFeed,
} }
c.UpstreamChainID = upstreamChainID c.UpstreamChainID = config.UpstreamChainID
c.router = newRouter(true) c.router = newRouter(true)
if verifProxyInitFn != nil { if verifProxyInitFn != nil {
@ -268,9 +278,13 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
} }
phm := healthmanager.NewProvidersHealthManager(chainID) phm := healthmanager.NewProvidersHealthManager(chainID)
c.healthMgr.RegisterProvidersHealthManager(context.Background(), phm) err := c.healthMgr.RegisterProvidersHealthManager(context.Background(), phm)
if err != nil {
return nil, fmt.Errorf("register providers health manager: %s", err)
}
client := chain.NewClient(ethClients, chainID, phm) client := chain.NewClient(ethClients, chainID, phm)
client.SetWalletNotifier(c.walletNotifier)
c.rpcClients[chainID] = client c.rpcClients[chainID] = client
return client, nil return client, nil
} }
@ -439,9 +453,10 @@ func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result in
if c.router.routeRemote(method) { if c.router.routeRemote(method) {
client, err := c.getClientUsingCache(chainID) client, err := c.getClientUsingCache(chainID)
if err == nil { if err != nil {
return client.CallContext(ctx, result, method, args...) return err
} }
return client.CallContext(ctx, result, method, args...)
} }
if c.local == nil { if c.local == nil {

View File

@ -44,7 +44,15 @@ func TestBlockedRoutesCall(t *testing.T) {
gethRPCClient, err := gethrpc.Dial(ts.URL) gethRPCClient, err := gethrpc.Dial(ts.URL)
require.NoError(t, err) require.NoError(t, err)
c, err := NewClient(gethRPCClient, 1, []params.Network{}, db, nil, nil) config := ClientConfig{
Client: gethRPCClient,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
c, err := NewClient(config)
require.NoError(t, err) require.NoError(t, err)
for _, m := range blockedMethods { for _, m := range blockedMethods {
@ -83,7 +91,15 @@ func TestBlockedRoutesRawCall(t *testing.T) {
gethRPCClient, err := gethrpc.Dial(ts.URL) gethRPCClient, err := gethrpc.Dial(ts.URL)
require.NoError(t, err) require.NoError(t, err)
c, err := NewClient(gethRPCClient, 1, []params.Network{}, db, nil, nil) config := ClientConfig{
Client: gethRPCClient,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
c, err := NewClient(config)
require.NoError(t, err) require.NoError(t, err)
for _, m := range blockedMethods { for _, m := range blockedMethods {
@ -142,7 +158,17 @@ func TestGetClientsUsingCache(t *testing.T) {
DefaultFallbackURL2: server.URL + path3, DefaultFallbackURL2: server.URL + path3,
}, },
} }
c, err := NewClient(nil, 1, networks, db, nil, providerConfigs)
config := ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: networks,
DB: db,
WalletFeed: nil,
ProviderConfigs: providerConfigs,
}
c, err := NewClient(config)
require.NoError(t, err) require.NoError(t, err)
// Networks from DB must pick up DefaultRPCURL, DefaultFallbackURL, DefaultFallbackURL2 // Networks from DB must pick up DefaultRPCURL, DefaultFallbackURL, DefaultFallbackURL2

View File

@ -48,7 +48,16 @@ func (s *ProxySuite) startRpcClient(infuraURL string) *Client {
db, close := setupTestNetworkDB(s.T()) db, close := setupTestNetworkDB(s.T())
defer close() defer close()
c, err := NewClient(gethRPCClient, 1, []params.Network{}, db)
config := ClientConfig{
Client: gethRPCClient,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
c, err := NewClient(config)
require.NoError(s.T(), err) require.NoError(s.T(), err)
return c return c

View File

@ -33,7 +33,15 @@ func setupTestAPI(t *testing.T) (*API, func()) {
_ = client _ = client
rpcClient, err := statusRPC.NewClient(nil, 1, nil, db, nil) config := statusRPC.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: nil,
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
rpcClient, err := statusRPC.NewClient(config)
require.NoError(t, err) require.NoError(t, err)
// import account keys // import account keys

View File

@ -144,7 +144,15 @@ func TestAPI_GetAddressDetails(t *testing.T) {
DefaultFallbackURL: serverWith1SecDelay.URL, DefaultFallbackURL: serverWith1SecDelay.URL,
}, },
} }
c, err := rpc.NewClient(nil, chainID, networks, appDB, providerConfigs) config := rpc.ClientConfig{
Client: nil,
UpstreamChainID: chainID,
Networks: networks,
DB: appDB,
WalletFeed: nil,
ProviderConfigs: providerConfigs,
}
c, err := rpc.NewClient(config)
require.NoError(t, err) require.NoError(t, err)
chainClient, err := c.EthClient(chainID) chainClient, err := c.EthClient(chainID)

View File

@ -404,7 +404,16 @@ func Test_removeBalanceHistoryOnEventAccountRemoved(t *testing.T) {
txServiceMockCtrl := gomock.NewController(t) txServiceMockCtrl := gomock.NewController(t)
server, _ := fake.NewTestServer(txServiceMockCtrl) server, _ := fake.NewTestServer(txServiceMockCtrl)
client := gethrpc.DialInProc(server) client := gethrpc.DialInProc(server)
rpcClient, _ := rpc.NewClient(client, chainID, nil, appDB, nil)
config := rpc.ClientConfig{
Client: client,
UpstreamChainID: chainID,
Networks: nil,
DB: appDB,
WalletFeed: nil,
ProviderConfigs: nil,
}
rpcClient, _ := rpc.NewClient(config)
rpcClient.UpstreamChainID = chainID rpcClient.UpstreamChainID = chainID
service := NewService(walletDB, accountsDB, &accountFeed, &walletFeed, rpcClient, nil, nil, nil) service := NewService(walletDB, accountsDB, &accountFeed, &walletFeed, rpcClient, nil, nil, nil)

View File

@ -51,7 +51,7 @@ func (s *MarketTestSuite) TestEventOnRpsError() {
s.Require().Equal(event.Type, EventMarketStatusChanged) s.Require().Equal(event.Type, EventMarketStatusChanged)
} }
func (s *MarketTestSuite) TestNoEventOnNetworkError() { func (s *MarketTestSuite) TestEventOnNetworkError() {
ctrl := gomock.NewController(s.T()) ctrl := gomock.NewController(s.T())
defer ctrl.Finish() defer ctrl.Finish()
@ -62,10 +62,11 @@ func (s *MarketTestSuite) TestNoEventOnNetworkError() {
_, err := manager.FetchPrices(s.symbols, s.currencies) _, err := manager.FetchPrices(s.symbols, s.currencies)
s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError") s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError")
_, ok := s.feedSub.WaitForEvent(time.Millisecond * 500) event, ok := s.feedSub.WaitForEvent(500 * time.Millisecond)
s.Require().True(ok, "expected an event, but none was received")
//THEN // THEN
s.Require().False(ok, "expected no event, but one was received") s.Require().Equal(event.Type, EventMarketStatusChanged)
} }
func TestMarketTestSuite(t *testing.T) { func TestMarketTestSuite(t *testing.T) {

View File

@ -91,7 +91,15 @@ func setupTestNetworkDB(t *testing.T) (*sql.DB, func()) {
func setupRouter(t *testing.T) (*Router, func()) { func setupRouter(t *testing.T) (*Router, func()) {
db, cleanTmpDb := setupTestNetworkDB(t) db, cleanTmpDb := setupTestNetworkDB(t)
client, _ := rpc.NewClient(nil, 1, defaultNetworks, db, nil) config := rpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: defaultNetworks,
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := rpc.NewClient(config)
router := NewRouter(client, nil, nil, nil, nil, nil, nil, nil) router := NewRouter(client, nil, nil, nil, nil, nil, nil, nil)

View File

@ -331,7 +331,17 @@ func Test_removeTokenBalanceOnEventAccountRemoved(t *testing.T) {
txServiceMockCtrl := gomock.NewController(t) txServiceMockCtrl := gomock.NewController(t)
server, _ := fake.NewTestServer(txServiceMockCtrl) server, _ := fake.NewTestServer(txServiceMockCtrl)
client := gethrpc.DialInProc(server) client := gethrpc.DialInProc(server)
rpcClient, _ := rpc.NewClient(client, chainID, nil, appDB, nil)
config := rpc.ClientConfig{
Client: client,
UpstreamChainID: chainID,
Networks: nil,
DB: appDB,
WalletFeed: nil,
ProviderConfigs: nil,
}
rpcClient, _ := rpc.NewClient(config)
rpcClient.UpstreamChainID = chainID rpcClient.UpstreamChainID = chainID
nm := network.NewManager(appDB) nm := network.NewManager(appDB)
mediaServer, err := mediaserver.NewMediaServer(appDB, nil, nil, walletDB) mediaServer, err := mediaserver.NewMediaServer(appDB, nil, nil, walletDB)

View File

@ -10,6 +10,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/status-im/status-go/contracts"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/t/utils"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
@ -24,30 +28,26 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/contracts"
"github.com/status-im/status-go/contracts/balancechecker" "github.com/status-im/status-go/contracts/balancechecker"
"github.com/status-im/status-go/contracts/ethscan" "github.com/status-im/status-go/contracts/ethscan"
"github.com/status-im/status-go/contracts/ierc20" "github.com/status-im/status-go/contracts/ierc20"
ethtypes "github.com/status-im/status-go/eth-node/types" ethtypes "github.com/status-im/status-go/eth-node/types"
ethclient "github.com/status-im/status-go/rpc/chain/ethclient"
mock_client "github.com/status-im/status-go/rpc/chain/mock/client"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
mock_rpcclient "github.com/status-im/status-go/rpc/mock/client"
"github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/community"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
multicommon "github.com/status-im/status-go/multiaccounts/common" multicommon "github.com/status-im/status-go/multiaccounts/common"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
statusRpc "github.com/status-im/status-go/rpc" statusRpc "github.com/status-im/status-go/rpc"
ethclient "github.com/status-im/status-go/rpc/chain/ethclient"
mock_client "github.com/status-im/status-go/rpc/chain/mock/client"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
mock_rpcclient "github.com/status-im/status-go/rpc/mock/client"
"github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
walletcommon "github.com/status-im/status-go/services/wallet/common" walletcommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/community"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
"github.com/status-im/status-go/walletdatabase" "github.com/status-im/status-go/walletdatabase"
) )
@ -1079,7 +1079,17 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo
return nil return nil
} }
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil)
config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc) client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager.SetTokens([]*token.Token{ tokenManager.SetTokens([]*token.Token{
@ -1342,7 +1352,16 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
currentBlock: 100, currentBlock: 100,
} }
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc) client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
@ -1466,7 +1485,16 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) {
currentBlock: 100, currentBlock: 100,
} }
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc) client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
@ -1546,7 +1574,16 @@ func TestFetchNewBlocksCommand_nonceDetection(t *testing.T) {
mediaServer, err := server.NewMediaServer(appdb, nil, nil, db) mediaServer, err := server.NewMediaServer(appdb, nil, nil, db)
require.NoError(t, err) require.NoError(t, err)
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc) client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
@ -1660,7 +1697,16 @@ func TestFetchNewBlocksCommand(t *testing.T) {
} }
//tc.printPreparedData = true //tc.printPreparedData = true
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc) client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
@ -1799,7 +1845,16 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err) require.NoError(t, err)
client, _ := statusRpc.NewClient(nil, 1, []params.Network{}, db, nil) config := statusRpc.ClientConfig{
Client: nil,
UpstreamChainID: 1,
Networks: []params.Network{},
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
client, _ := statusRpc.NewClient(config)
maker, _ := contracts.NewContractMaker(client) maker, _ := contracts.NewContractMaker(client)
wdb := NewDB(db) wdb := NewDB(db)

View File

@ -39,7 +39,15 @@ func setupTestAPI(t *testing.T) (*API, func()) {
server, _ := fake.NewTestServer(txServiceMockCtrl) server, _ := fake.NewTestServer(txServiceMockCtrl)
client := gethrpc.DialInProc(server) client := gethrpc.DialInProc(server)
rpcClient, err := statusRPC.NewClient(client, 1, nil, db, nil) config := statusRPC.ClientConfig{
Client: client,
UpstreamChainID: 1,
Networks: nil,
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
rpcClient, err := statusRPC.NewClient(config)
require.NoError(t, err) require.NoError(t, err)
// import account keys // import account keys

View File

@ -14,6 +14,8 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
statusRpc "github.com/status-im/status-go/rpc"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
gethtypes "github.com/ethereum/go-ethereum/core/types" gethtypes "github.com/ethereum/go-ethereum/core/types"
@ -26,7 +28,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc"
wallet_common "github.com/status-im/status-go/services/wallet/common" wallet_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/sqlite" "github.com/status-im/status-go/sqlite"
"github.com/status-im/status-go/t/utils" "github.com/status-im/status-go/t/utils"
@ -60,13 +61,23 @@ func (s *TransactorSuite) SetupTest() {
chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64() chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64()
db, err := sqlite.OpenUnecryptedDB(sqlite.InMemoryPath) // dummy to make rpc.Client happy db, err := sqlite.OpenUnecryptedDB(sqlite.InMemoryPath) // dummy to make rpc.Client happy
s.Require().NoError(err) s.Require().NoError(err)
rpcClient, _ := rpc.NewClient(s.client, chainID, nil, db, nil)
config := statusRpc.ClientConfig{
Client: s.client,
UpstreamChainID: chainID,
Networks: nil,
DB: db,
WalletFeed: nil,
ProviderConfigs: nil,
}
rpcClient, _ := statusRpc.NewClient(config)
rpcClient.UpstreamChainID = chainID rpcClient.UpstreamChainID = chainID
ethClients := []ethclient.RPSLimitedEthClientInterface{ ethClients := []ethclient.RPSLimitedEthClientInterface{
ethclient.NewRPSLimitedEthClient(s.client, rpclimiter.NewRPCRpsLimiter(), "local-1-chain-id-1"), ethclient.NewRPSLimitedEthClient(s.client, rpclimiter.NewRPCRpsLimiter(), "local-1-chain-id-1"),
} }
localClient := chain.NewClient(ethClients, chainID) localClient := chain.NewClient(ethClients, chainID, nil)
rpcClient.SetClient(chainID, localClient) rpcClient.SetClient(chainID, localClient)
nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID) nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID)