chore(network)_: implement networks events

This commit is contained in:
Dario Gabriel Lipicar 2025-01-31 15:23:07 -03:00 committed by dlipicar
parent c862b563c0
commit 78d05d74e9
14 changed files with 239 additions and 20 deletions

View File

@ -140,6 +140,8 @@ type StatusNode struct {
accountsFeed event.Feed
walletFeed event.Feed
networksFeed event.Feed
settingsFeed event.Feed
}
// New makes new instance of StatusNode.
@ -341,7 +343,10 @@ func (n *StatusNode) setupRPCClient() (err error) {
UpstreamChainID: n.config.NetworkID,
Networks: n.config.Networks,
DB: n.appDB,
AccountsFeed: &n.accountsFeed,
WalletFeed: &n.walletFeed,
SettingsFeed: &n.settingsFeed,
NetworksFeed: &n.networksFeed,
}
n.rpcClient, err = rpc.NewClient(config)
if err != nil {

View File

@ -77,13 +77,12 @@ var (
)
func (b *StatusNode) initServices(config *params.NodeConfig, mediaServer *server.MediaServer) error {
settingsFeed := &event.Feed{}
accDB, err := accounts.NewDB(b.appDB)
if err != nil {
return err
}
setSettingsNotifier(accDB, settingsFeed)
setSettingsNotifier(accDB, &b.settingsFeed)
services := []common.StatusService{}
services = append(services, b.rpcFiltersService())
@ -111,7 +110,7 @@ func (b *StatusNode) initServices(config *params.NodeConfig, mediaServer *server
// Wallet Service is used by wakuExtSrvc/wakuV2ExtSrvc
// Keep this initialization before the other two
if config.WalletConfig.Enabled {
walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, settingsFeed, &b.walletFeed, config.WalletConfig.StatusProxyStageName)
walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, &b.networksFeed, &b.walletFeed, config.WalletConfig.StatusProxyStageName)
services = append(services, walletService)
}
@ -589,10 +588,10 @@ func (b *StatusNode) SetWalletCommunityInfoProvider(provider thirdparty.Communit
}
}
func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, settingsFeed *event.Feed, walletFeed *event.Feed, statusProxyStageName string) *wallet.Service {
func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, networksFeed *event.Feed, walletFeed *event.Feed, statusProxyStageName string) *wallet.Service {
if b.walletSrvc == nil {
b.walletSrvc = wallet.NewService(
b.walletDB, accountsDB, appDB, b.rpcClient, accountsFeed, settingsFeed, b.gethAccountManager, b.transactor, b.config,
b.walletDB, accountsDB, appDB, b.rpcClient, accountsFeed, networksFeed, b.gethAccountManager, b.transactor, b.config,
b.ensService(b.timeSourceNow()).API().EnsResolver(),
b.pendingTracker,
walletFeed,

View File

@ -105,7 +105,10 @@ type Client struct {
healthMgr *healthmanager.BlockchainHealthManager
stopMonitoringFunc context.CancelFunc
accountsFeed *event.Feed
walletFeed *event.Feed
settingsFeed *event.Feed
networksFeed *event.Feed
handlersMx sync.RWMutex // mx guards handlers
handlers map[string]Handler // locally registered handlers
@ -123,7 +126,10 @@ type ClientConfig struct {
UpstreamChainID uint64
Networks []params.Network
DB *sql.DB
AccountsFeed *event.Feed
WalletFeed *event.Feed
SettingsFeed *event.Feed
NetworksFeed *event.Feed
}
// NewClient initializes Client
@ -132,7 +138,7 @@ type ClientConfig struct {
// reconnect to the server if connection is lost.
func NewClient(config ClientConfig) (*Client, error) {
logger := logutils.ZapLogger().Named("rpcClient")
networkManager := network.NewManager(config.DB)
networkManager := network.NewManager(config.DB, config.AccountsFeed, config.SettingsFeed, config.NetworksFeed)
if networkManager == nil {
return nil, errors.New("failed to create network manager")
}
@ -151,7 +157,10 @@ func NewClient(config ClientConfig) (*Client, error) {
limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter),
logger: logger,
healthMgr: healthmanager.NewBlockchainHealthManager(),
accountsFeed: config.AccountsFeed,
walletFeed: config.WalletFeed,
settingsFeed: config.SettingsFeed,
networksFeed: config.NetworksFeed,
}
c.UpstreamChainID = config.UpstreamChainID
@ -165,6 +174,8 @@ func NewClient(config ClientConfig) (*Client, error) {
}
func (c *Client) Start(ctx context.Context) {
c.NetworkManager.Start()
if c.stopMonitoringFunc != nil {
c.logger.Warn("Blockchain health manager already started")
return
@ -177,6 +188,8 @@ func (c *Client) Start(ctx context.Context) {
}
func (c *Client) Stop() {
c.NetworkManager.Stop()
c.healthMgr.Stop()
if c.stopMonitoringFunc == nil {
return

View File

@ -6,13 +6,18 @@ import (
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/errors"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/params/networkhelper"
"github.com/status-im/status-go/services/accounts/settingsevent"
persistence "github.com/status-im/status-go/rpc/network/db"
"github.com/status-im/status-go/rpc/network/networksevent"
)
//go:generate mockgen -package=mock -source=network.go -destination=mock/network.go
@ -49,11 +54,16 @@ type Manager struct {
networkPersistence persistence.NetworksPersistenceInterface
embeddedNetworks []params.Network
accountFeed *event.Feed
settingsFeed *event.Feed
networksFeed *event.Feed
settingsWatcher *settingsevent.Watcher
logger *zap.Logger
}
// NewManager creates a new instance of Manager.
func NewManager(db *sql.DB) *Manager {
func NewManager(db *sql.DB, accountFeed *event.Feed, settingsFeed *event.Feed, networksFeed *event.Feed) *Manager {
accountsDB, err := accounts.NewDB(db)
if err != nil {
return nil
@ -65,10 +75,85 @@ func NewManager(db *sql.DB) *Manager {
db: db,
accountsDB: accountsDB,
networkPersistence: persistence.NewNetworksPersistence(db),
accountFeed: accountFeed,
settingsFeed: settingsFeed,
networksFeed: networksFeed,
logger: logger,
}
}
func (nm *Manager) Start() {
if nm.settingsWatcher == nil {
settingChangeCb := func(setting settings.SettingField, value interface{}) {
if setting.Equals(settings.TestNetworksEnabled) {
nm.onTestNetworksEnabledChanged()
}
}
nm.settingsWatcher = settingsevent.NewWatcher(nm.settingsFeed, settingChangeCb)
nm.settingsWatcher.Start()
}
}
func (nm *Manager) Stop() {
if nm.settingsWatcher != nil {
nm.settingsWatcher.Stop()
nm.settingsWatcher = nil
}
}
func (nm *Manager) onTestNetworksEnabledChanged() {
areTestNetworksEnabled, err := nm.GetTestNetworksEnabled()
if err != nil {
nm.logger.Error("failed to get test networks enabled", zap.Error(err))
return
}
oldActiveNetworks, err := nm.getActiveNetworksForTestMode(!areTestNetworksEnabled)
if err != nil {
nm.logger.Error("failed to get old active networks", zap.Error(err))
return
}
newActiveNetworks, err := nm.getActiveNetworksForTestMode(areTestNetworksEnabled)
if err != nil {
nm.logger.Error("failed to get new active networks", zap.Error(err))
return
}
nm.notifyActiveNetworksChange(newActiveNetworks, oldActiveNetworks)
}
func (nm *Manager) notifyActiveNetworksChange(activatedNetworks []*params.Network, deactivatedNetworks []*params.Network) {
if nm.networksFeed == nil {
return
}
currentActiveNetworks, err := nm.GetActiveNetworks()
if err != nil {
nm.logger.Error("failed to get active networks", zap.Error(err))
return
}
params := &networksevent.ActiveNetworksChangedParams{
CurrentActiveChainIDs: networksToChainIDs(currentActiveNetworks),
ActivatedChainIDs: networksToChainIDs(activatedNetworks),
DeactivatedChainIDs: networksToChainIDs(deactivatedNetworks),
}
nm.networksFeed.Send(networksevent.Event{
Type: networksevent.EventTypeActiveNetworksChanged,
ActiveNetworksChangedParams: params,
})
}
func networksToChainIDs(networks []*params.Network) []uint64 {
chainIDs := make([]uint64, 0, len(networks))
for _, network := range networks {
chainIDs = append(chainIDs, network.ChainID)
}
return chainIDs
}
// Init initializes the nets, merges them with existing ones, and wraps the operation in a transaction.
// We should store the following information in the DB:
// - User's RPC providers
@ -209,6 +294,16 @@ func (nm *Manager) SetActive(chainID uint64, active bool) error {
if err != nil {
return errors.CreateErrorResponseFromError(fmt.Errorf("failed to persist active status: %w", err))
}
activatedNetworks := []*params.Network{}
deactivatedNetworks := []*params.Network{}
if active {
activatedNetworks = append(activatedNetworks, network)
} else {
deactivatedNetworks = append(deactivatedNetworks, network)
}
nm.notifyActiveNetworksChange(activatedNetworks, deactivatedNetworks)
return nil
}

View File

@ -30,7 +30,7 @@ func (s *NetworkManagerTestSuite) SetupTest() {
s.Require().NoError(err)
s.db = testDb
s.cleanup = func() { s.Require().NoError(cleanup()) }
s.manager = network.NewManager(testDb)
s.manager = network.NewManager(testDb, nil, nil, nil)
persistence := db.NewNetworksPersistence(testDb)
// Use testutil to initialize networks

View File

@ -0,0 +1,21 @@
package networksevent
// EventType type for event types.
type EventType string
// Event is a type for networks events.
type Event struct {
Type EventType `json:"type"`
ActiveNetworksChangedParams *ActiveNetworksChangedParams `json:"activeNetworksChangedParams,omitempty"` // Only for EventTypeActiveNetworksChanged
}
const (
// EventTypeActiveNetworksChanged is emitted when networks are activated/deactivated. This includes Testnet mode change.
EventTypeActiveNetworksChanged EventType = "active-networks-changed"
)
type ActiveNetworksChangedParams struct {
ActivatedChainIDs []uint64 `json:"activatedChainIds"`
DeactivatedChainIDs []uint64 `json:"deactivatedChainIds"`
CurrentActiveChainIDs []uint64 `json:"currentActiveChainIds"`
}

View File

@ -0,0 +1,86 @@
package networksevent
import (
"context"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/services/wallet/async"
)
type EventCallbacks struct {
ActiveNetworksChangeCb ActiveNetworksChangeCb
}
type ActiveNetworksChangeCb func(params *ActiveNetworksChangedParams)
// Watcher executes a given callback whenever a network event is emitted
type Watcher struct {
accountFeed *event.Feed
group *async.Group
callbacks EventCallbacks
}
func NewWatcher(accountFeed *event.Feed, callbacks EventCallbacks) *Watcher {
return &Watcher{
accountFeed: accountFeed,
callbacks: callbacks,
}
}
func (w *Watcher) Start() {
if w.group != nil {
return
}
w.group = async.NewGroup(context.Background())
w.group.Add(func(ctx context.Context) error {
return watch(ctx, w.accountFeed, w.callbacks)
})
}
func (w *Watcher) Stop() {
if w.group != nil {
w.group.Stop()
w.group.Wait()
w.group = nil
}
}
func onActiveNetworksChange(callback ActiveNetworksChangeCb, params *ActiveNetworksChangedParams) {
if callback == nil {
return
}
if params == nil {
logutils.ZapLogger().Error("no params in event EventTypeActiveNetworksChanged")
return
}
callback(params)
}
func watch(ctx context.Context, accountFeed *event.Feed, callbacks EventCallbacks) error {
ch := make(chan Event, 1)
sub := accountFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
if err != nil {
logutils.ZapLogger().Error("network events watcher subscription failed", zap.Error(err))
}
case ev := <-ch:
switch ev.Type {
case EventTypeActiveNetworksChanged:
onActiveNetworksChange(callbacks.ActiveNetworksChangeCb, ev.ActiveNetworksChangedParams)
}
}
}
}

View File

@ -66,7 +66,7 @@ func setupCommand(t *testing.T, method string) (state testState, close func()) {
state.db, closeDb = createDB(t)
state.walletDb, closeWalletDb = createWalletDB(t)
networkManager := network.NewManager(state.db)
networkManager := network.NewManager(state.db, nil, nil, nil)
require.NotNil(t, networkManager)
err := networkManager.InitEmbeddedNetworks([]params.Network{

View File

@ -70,7 +70,7 @@ func setupTests(t *testing.T) (state testState, close func()) {
state.mockCtrl = gomock.NewController(t)
state.rpcClient = mock_rpcclient.NewMockClientInterface(state.mockCtrl)
networkManager := network.NewManager(state.db)
networkManager := network.NewManager(state.db, nil, nil, nil)
require.NotNil(t, networkManager)
err = networkManager.InitEmbeddedNetworks([]params.Network{

View File

@ -29,7 +29,7 @@ func TestKeycardPairingsFile(t *testing.T) {
accountFeed := &event.Feed{}
service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db)}, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, "")
service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db, nil, nil, nil)}, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, "")
data, err := service.KeycardPairings().GetPairingsJSONFileContent()
require.NoError(t, err)

View File

@ -55,7 +55,7 @@ func NewService(
appDB *sql.DB,
rpcClient *rpc.Client,
accountFeed *event.Feed,
settingsFeed *event.Feed,
networksFeed *event.Feed,
gethManager *account.GethManager,
transactor *transactions.Transactor,
config *params.NodeConfig,

View File

@ -225,7 +225,7 @@ func TestGetCachedBalancesByChain(t *testing.T) {
require.NoError(t, persistence.SaveTokens(tokens))
tokenManager := NewTokenManager(db, nil, community.NewManager(db, nil, nil), network.NewManager(db), db, nil, nil, nil, nil, persistence)
tokenManager := NewTokenManager(db, nil, community.NewManager(db, nil, nil), network.NewManager(db, nil, nil, nil), db, nil, nil, nil, nil, persistence)
// Verify that the token balance was inserted correctly
var count int

View File

@ -342,7 +342,7 @@ func Test_removeTokenBalanceOnEventAccountRemoved(t *testing.T) {
rpcClient, _ := rpc.NewClient(config)
rpcClient.UpstreamChainID = chainID
nm := network.NewManager(appDB)
nm := network.NewManager(appDB, nil, nil, nil)
mediaServer, err := mediaserver.NewMediaServer(appDB, nil, nil, walletDB)
require.NoError(t, err)
@ -406,7 +406,7 @@ func Test_tokensListsValidity(t *testing.T) {
accountsDB, err := accounts.NewDB(appDB)
require.NoError(t, err)
nm := network.NewManager(appDB)
nm := network.NewManager(appDB, nil, nil, nil)
manager := NewTokenManager(walletDB, nil, nil, nm, appDB, nil, nil, nil, accountsDB, NewPersistence(walletDB))
require.NotNil(t, manager)

View File

@ -1091,7 +1091,7 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo
require.NoError(t, err)
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb, nil, nil, nil), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager.SetTokens([]*token.Token{
{
Address: tokenTXXAddress,
@ -1361,7 +1361,7 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb, nil, nil, nil), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager.SetTokens([]*token.Token{
{
@ -1492,7 +1492,7 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) {
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb, nil, nil, nil), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager.SetTokens([]*token.Token{
{
@ -1580,7 +1580,7 @@ func TestFetchNewBlocksCommand_nonceDetection(t *testing.T) {
client, _ := statusRpc.NewClient(config)
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb, nil, nil, nil), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
wdb := NewDB(db)
blockChannel := make(chan []*DBHeader, 10)
@ -1703,7 +1703,7 @@ func TestFetchNewBlocksCommand(t *testing.T) {
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb, nil, nil, nil), appdb, mediaServer, nil, nil, nil, token.NewPersistence(db))
tokenManager.SetTokens([]*token.Token{
{