From 3d4c3d2dd543e25cc5753071b33d0da92ab76c91 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Thu, 12 Sep 2024 15:00:26 +0100 Subject: [PATCH] wip --- cmd/statusd/main.go | 398 +++++++++--------- cmd/statusd/rest/InitializeApplication.rest | 6 + cmd/statusd/rest/signals.rest | 2 + cmd/statusd/server/signals_server.go | 35 +- .../transfer/commands_sequential_test.go | 95 +++-- 5 files changed, 281 insertions(+), 255 deletions(-) create mode 100644 cmd/statusd/rest/InitializeApplication.rest create mode 100644 cmd/statusd/rest/signals.rest diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index d22ace42f..a758faaee 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -19,23 +19,15 @@ import ( "golang.org/x/crypto/ssh/terminal" "github.com/ethereum/go-ethereum/log" - gethmetrics "github.com/ethereum/go-ethereum/metrics" - "github.com/status-im/status-go/api" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/cmd/statusd/server" "github.com/status-im/status-go/common/dbsetup" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" - "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/metrics" nodemetrics "github.com/status-im/status-go/metrics/node" "github.com/status-im/status-go/node" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/profiling" "github.com/status-im/status-go/protocol" - "github.com/status-im/status-go/protocol/pushnotificationserver" - "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/walletdatabase" ) @@ -186,202 +178,202 @@ func main() { }() } - backend := api.NewGethStatusBackend() - if config.NodeKey == "" { - logger.Error("node key needs to be set if running a push notification server") - return - } + //backend := api.NewGethStatusBackend() + //if config.NodeKey == "" { + // logger.Error("node key needs to be set if running a push notification server") + // return + //} + // + //identity, err := crypto.HexToECDSA(config.NodeKey) + //if err != nil { + // logger.Error("node key is invalid", "error", err) + // return + //} + // + //// Generate installationID from public key, so it's always the same + //installationID, err := uuid.FromBytes(crypto.CompressPubkey(&identity.PublicKey)[:16]) + //if err != nil { + // logger.Error("cannot create installation id", "error", err) + // return + //} + // + //if *seedPhrase != "" { + // // Remove data inside dir to avoid conflicts with existing data or account restoration fails + // if err := os.RemoveAll(config.DataDir); err != nil { + // logger.Error("failed to remove data dir", "error", err) + // return + // } + // + // if err := createDirsFromConfig(config); err != nil { + // logger.Error("failed to create directories", "error", err) + // return + // } + // + // request := requests.RestoreAccount{ + // Mnemonic: *seedPhrase, + // FetchBackup: false, + // CreateAccount: requests.CreateAccount{ + // DisplayName: "Account1", + // DeviceName: "StatusIM", + // Password: *password, + // CustomizationColor: "0x000000", + // RootDataDir: config.DataDir, + // APIConfig: &requests.APIConfig{ + // ConnectorEnabled: config.ClusterConfig.Enabled, + // HTTPEnabled: config.HTTPEnabled, + // HTTPHost: config.HTTPHost, + // HTTPPort: config.HTTPPort, + // HTTPVirtualHosts: config.HTTPVirtualHosts, + // WSEnabled: config.WSEnabled, + // WSHost: config.WSHost, + // WSPort: config.WSPort, + // APIModules: config.APIModules, + // }, + // NetworkID: &config.NetworkID, + // TestOverrideNetworks: config.Networks, + // }, + // } + // + // api.OverrideApiConfigTest() + // + // _, err := backend.RestoreAccountAndLogin(&request) + // if err != nil { + // logger.Error("failed to import account", "error", err) + // return + // } + // + // appDB, walletDB, err := openDatabases(config.DataDir + "/" + installationID.String()) + // if err != nil { + // log.Error("failed to open databases") + // return + // } + // + // options := []protocol.Option{ + // protocol.WithDatabase(appDB), + // protocol.WithWalletDatabase(walletDB), + // protocol.WithTorrentConfig(&config.TorrentConfig), + // protocol.WithWalletConfig(&config.WalletConfig), + // protocol.WithAccountManager(backend.AccountManager()), + // } + // + // messenger, err := protocol.NewMessenger( + // config.Name, + // identity, + // gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), + // installationID.String(), + // nil, + // config.Version, + // options..., + // ) + // + // if err != nil { + // logger.Error("failed to create messenger", "error", err) + // return + // } + // + // _, err = messenger.Start() + // if err != nil { + // logger.Error("failed to start messenger", "error", err) + // return + // } + // + // interruptCh := haltOnInterruptSignal(backend.StatusNode()) + // go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh) + // + //} else { + // appDB, walletDB, err := startNode(config, backend, installationID) + // if err != nil { + // logger.Error("failed to start node", "error", err) + // return + // } + // + // err = sdnotify.Ready() + // if err == sdnotify.ErrSdNotifyNoSocket { + // logger.Debug("sd_notify socket not available") + // } else if err != nil { + // logger.Warn("sd_notify READY call failed", "error", err) + // } else { + // // systemd aliveness notifications, affects only Linux + // go startSystemDWatchdog() + // } + // + // // handle interrupt signals + // interruptCh := haltOnInterruptSignal(backend.StatusNode()) + // + // // Start collecting metrics. Metrics can be enabled by providing `-metrics` flag + // // or setting `gethmetrics.Enabled` to true during compilation time: + // // https://github.com/status-im/go-ethereum/pull/76. + // if *metricsEnabled || gethmetrics.Enabled { + // go startCollectingNodeMetrics(interruptCh, backend.StatusNode()) + // go gethmetrics.CollectProcessMetrics(3 * time.Second) + // go metrics.NewMetricsServer(*metricsPort, gethmetrics.DefaultRegistry).Listen() + // } + // + // // Check if profiling shall be enabled. + // if *pprofEnabled { + // profiling.NewProfiler(*pprofPort).Go() + // } + // + // if config.PushNotificationServerConfig.Enabled { + // options := []protocol.Option{ + // protocol.WithPushNotifications(), + // protocol.WithPushNotificationServerConfig(&pushnotificationserver.Config{ + // Enabled: config.PushNotificationServerConfig.Enabled, + // Identity: config.PushNotificationServerConfig.Identity, + // GorushURL: config.PushNotificationServerConfig.GorushURL, + // }), + // protocol.WithDatabase(appDB), + // protocol.WithWalletDatabase(walletDB), + // protocol.WithTorrentConfig(&config.TorrentConfig), + // protocol.WithWalletConfig(&config.WalletConfig), + // protocol.WithAccountManager(backend.AccountManager()), + // } + // + // messenger, err := protocol.NewMessenger( + // config.Name, + // identity, + // gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), + // installationID.String(), + // nil, + // config.Version, + // options..., + // ) + // if err != nil { + // logger.Error("failed to create messenger", "error", err) + // return + // } + // + // err = messenger.InitInstallations() + // if err != nil { + // logger.Error("failed to init messenger installations", "error", err) + // return + // } + // + // err = messenger.InitFilters() + // if err != nil { + // logger.Error("failed to init messenger filters", "error", err) + // return + // } + // + // // This will start the push notification server as well as + // // the config is set to Enabled + // _, err = messenger.Start() + // if err != nil { + // logger.Error("failed to start messenger", "error", err) + // return + // } + // go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh) + // } + //} - identity, err := crypto.HexToECDSA(config.NodeKey) - if err != nil { - logger.Error("node key is invalid", "error", err) - return - } - - // Generate installationID from public key, so it's always the same - installationID, err := uuid.FromBytes(crypto.CompressPubkey(&identity.PublicKey)[:16]) - if err != nil { - logger.Error("cannot create installation id", "error", err) - return - } - - if *seedPhrase != "" { - // Remove data inside dir to avoid conflicts with existing data or account restoration fails - if err := os.RemoveAll(config.DataDir); err != nil { - logger.Error("failed to remove data dir", "error", err) - return - } - - if err := createDirsFromConfig(config); err != nil { - logger.Error("failed to create directories", "error", err) - return - } - - request := requests.RestoreAccount{ - Mnemonic: *seedPhrase, - FetchBackup: false, - CreateAccount: requests.CreateAccount{ - DisplayName: "Account1", - DeviceName: "StatusIM", - Password: *password, - CustomizationColor: "0x000000", - RootDataDir: config.DataDir, - APIConfig: &requests.APIConfig{ - ConnectorEnabled: config.ClusterConfig.Enabled, - HTTPEnabled: config.HTTPEnabled, - HTTPHost: config.HTTPHost, - HTTPPort: config.HTTPPort, - HTTPVirtualHosts: config.HTTPVirtualHosts, - WSEnabled: config.WSEnabled, - WSHost: config.WSHost, - WSPort: config.WSPort, - APIModules: config.APIModules, - }, - NetworkID: &config.NetworkID, - TestOverrideNetworks: config.Networks, - }, - } - - api.OverrideApiConfigTest() - - _, err := backend.RestoreAccountAndLogin(&request) - if err != nil { - logger.Error("failed to import account", "error", err) - return - } - - appDB, walletDB, err := openDatabases(config.DataDir + "/" + installationID.String()) - if err != nil { - log.Error("failed to open databases") - return - } - - options := []protocol.Option{ - protocol.WithDatabase(appDB), - protocol.WithWalletDatabase(walletDB), - protocol.WithTorrentConfig(&config.TorrentConfig), - protocol.WithWalletConfig(&config.WalletConfig), - protocol.WithAccountManager(backend.AccountManager()), - } - - messenger, err := protocol.NewMessenger( - config.Name, - identity, - gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), - installationID.String(), - nil, - config.Version, - options..., - ) - - if err != nil { - logger.Error("failed to create messenger", "error", err) - return - } - - _, err = messenger.Start() - if err != nil { - logger.Error("failed to start messenger", "error", err) - return - } - - interruptCh := haltOnInterruptSignal(backend.StatusNode()) - go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh) - - } else { - appDB, walletDB, err := startNode(config, backend, installationID) - if err != nil { - logger.Error("failed to start node", "error", err) - return - } - - err = sdnotify.Ready() - if err == sdnotify.ErrSdNotifyNoSocket { - logger.Debug("sd_notify socket not available") - } else if err != nil { - logger.Warn("sd_notify READY call failed", "error", err) - } else { - // systemd aliveness notifications, affects only Linux - go startSystemDWatchdog() - } - - // handle interrupt signals - interruptCh := haltOnInterruptSignal(backend.StatusNode()) - - // Start collecting metrics. Metrics can be enabled by providing `-metrics` flag - // or setting `gethmetrics.Enabled` to true during compilation time: - // https://github.com/status-im/go-ethereum/pull/76. - if *metricsEnabled || gethmetrics.Enabled { - go startCollectingNodeMetrics(interruptCh, backend.StatusNode()) - go gethmetrics.CollectProcessMetrics(3 * time.Second) - go metrics.NewMetricsServer(*metricsPort, gethmetrics.DefaultRegistry).Listen() - } - - // Check if profiling shall be enabled. - if *pprofEnabled { - profiling.NewProfiler(*pprofPort).Go() - } - - if config.PushNotificationServerConfig.Enabled { - options := []protocol.Option{ - protocol.WithPushNotifications(), - protocol.WithPushNotificationServerConfig(&pushnotificationserver.Config{ - Enabled: config.PushNotificationServerConfig.Enabled, - Identity: config.PushNotificationServerConfig.Identity, - GorushURL: config.PushNotificationServerConfig.GorushURL, - }), - protocol.WithDatabase(appDB), - protocol.WithWalletDatabase(walletDB), - protocol.WithTorrentConfig(&config.TorrentConfig), - protocol.WithWalletConfig(&config.WalletConfig), - protocol.WithAccountManager(backend.AccountManager()), - } - - messenger, err := protocol.NewMessenger( - config.Name, - identity, - gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), - installationID.String(), - nil, - config.Version, - options..., - ) - if err != nil { - logger.Error("failed to create messenger", "error", err) - return - } - - err = messenger.InitInstallations() - if err != nil { - logger.Error("failed to init messenger installations", "error", err) - return - } - - err = messenger.InitFilters() - if err != nil { - logger.Error("failed to init messenger filters", "error", err) - return - } - - // This will start the push notification server as well as - // the config is set to Enabled - _, err = messenger.Start() - if err != nil { - logger.Error("failed to start messenger", "error", err) - return - } - go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh) - } - } - - gethNode := backend.StatusNode().GethNode() - if gethNode != nil { - // wait till node has been stopped - gethNode.Wait() - if err := sdnotify.Stopping(); err != nil { - logger.Warn("sd_notify STOPPING call failed", "error", err) - } - } + //gethNode := backend.StatusNode().GethNode() + //if gethNode != nil { + // // wait till node has been stopped + // gethNode.Wait() + // if err := sdnotify.Stopping(); err != nil { + // logger.Warn("sd_notify STOPPING call failed", "error", err) + // } + //} } func getDefaultDataDir() string { diff --git a/cmd/statusd/rest/InitializeApplication.rest b/cmd/statusd/rest/InitializeApplication.rest new file mode 100644 index 000000000..47d084ff5 --- /dev/null +++ b/cmd/statusd/rest/InitializeApplication.rest @@ -0,0 +1,6 @@ +GET http://localhost:12345/InitializeApplication +Content-Type: application/json + +{ + "dataDir": "/Users/igorsirotin/Repositories/Status/status-desktop/Status/data" +} diff --git a/cmd/statusd/rest/signals.rest b/cmd/statusd/rest/signals.rest new file mode 100644 index 000000000..11ed7a4c3 --- /dev/null +++ b/cmd/statusd/rest/signals.rest @@ -0,0 +1,2 @@ +WEBSOCKET ws://localhost:12345/signals +Content-Type: application/json diff --git a/cmd/statusd/server/signals_server.go b/cmd/statusd/server/signals_server.go index 539a834f3..722014649 100644 --- a/cmd/statusd/server/signals_server.go +++ b/cmd/statusd/server/signals_server.go @@ -12,7 +12,9 @@ import ( "github.com/ethereum/go-ethereum/log" + statusgo "github.com/status-im/status-go/mobile" "github.com/status-im/status-go/signal" + "io" ) type Server struct { @@ -59,6 +61,10 @@ func (s *Server) Listen(address string) error { } http.HandleFunc("/signals", s.signals) + s.addStatusGoEndpoint("/InitializeApplication", statusgo.InitializeApplication) + s.addStatusGoEndpoint("/CreateAccountAndLogin", statusgo.CreateAccountAndLogin) + s.addStatusGoEndpoint("/RestoreAccountAndLogin", statusgo.RestoreAccountAndLogin) + s.addStatusGoEndpoint("/LoginAccount", statusgo.LoginAccount) listener, err := net.Listen("tcp", address) if err != nil { @@ -67,12 +73,12 @@ func (s *Server) Listen(address string) error { s.address = listener.Addr().String() - go func() { - err := s.server.Serve(listener) - if !errors.Is(err, http.ErrServerClosed) { - log.Error("signals server closed with error: %w", err) - } - }() + //go func() { + err = s.server.Serve(listener) + if !errors.Is(err, http.ErrServerClosed) { + log.Error("signals server closed with error: %w", err) + } + //}() return nil } @@ -113,3 +119,20 @@ func (s *Server) signals(w http.ResponseWriter, r *http.Request) { s.connections[connection] = struct{}{} } + +func (s *SignalsServer) addStatusGoEndpoint(endpoint string, handler func(string) string) { + http.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) { + request, err := io.ReadAll(r.Body) + if err != nil { + log.Error("failed to read request: %w", err) + return + } + + response := handler(string(request)) + + _, err = w.Write([]byte(response)) + if err != nil { + log.Error("failed to write response: %w", err) + } + }) +} diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 43210092c..5352c11f3 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -1120,63 +1120,66 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo func TestFindBlocksCommand(t *testing.T) { for idx, testCase := range getCases() { - t.Log("case #", idx+1) + t.Run(fmt.Sprintf("case #%d", idx+1), func(t *testing.T) { - accountAddress := common.HexToAddress("0x1234") - rangeSize := 20 - if testCase.rangeSize != 0 { - rangeSize = testCase.rangeSize - } + t.Log("case #", idx+1) - balances := map[common.Address][][]int{accountAddress: testCase.balanceChanges} - outgoingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC20Transfers} - incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC20Transfers} - outgoingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC1155SingleTransfers} - incomingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC1155SingleTransfers} + accountAddress := common.HexToAddress("0x1234") + rangeSize := 20 + if testCase.rangeSize != 0 { + rangeSize = testCase.rangeSize + } - fbc, tc, blockChannel, blockRangeDAO := setupFindBlocksCommand(t, accountAddress, big.NewInt(testCase.fromBlock), big.NewInt(testCase.toBlock), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, outgoingERC1155SingleTransfers, incomingERC1155SingleTransfers) - ctx := context.Background() - group := async.NewGroup(ctx) - group.Add(fbc.Command()) + balances := map[common.Address][][]int{accountAddress: testCase.balanceChanges} + outgoingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC20Transfers} + incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC20Transfers} + outgoingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC1155SingleTransfers} + incomingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC1155SingleTransfers} - foundBlocks := []*DBHeader{} - select { - case <-ctx.Done(): - t.Log("ERROR") - case <-group.WaitAsync(): - close(blockChannel) - for { - bloks, ok := <-blockChannel - if !ok { - break + fbc, tc, blockChannel, blockRangeDAO := setupFindBlocksCommand(t, accountAddress, big.NewInt(testCase.fromBlock), big.NewInt(testCase.toBlock), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, outgoingERC1155SingleTransfers, incomingERC1155SingleTransfers) + ctx := context.Background() + group := async.NewGroup(ctx) + group.Add(fbc.Command()) + + var foundBlocks []*DBHeader + select { + case <-ctx.Done(): + t.Log("ERROR") + case <-group.WaitAsync(): + close(blockChannel) + for { + blocks, ok := <-blockChannel + if !ok { + break + } + foundBlocks = append(foundBlocks, blocks...) } - foundBlocks = append(foundBlocks, bloks...) - } - numbers := []int64{} - for _, block := range foundBlocks { - numbers = append(numbers, block.Number.Int64()) - } + numbers := []int64{} + for _, block := range foundBlocks { + numbers = append(numbers, block.Number.Int64()) + } - if tc.traceAPICalls { - tc.printCounter() - } + if tc.traceAPICalls { + tc.printCounter() + } - for name, cnt := range testCase.expectedCalls { - require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name) - } + for name, cnt := range testCase.expectedCalls { + require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name) + } - sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] }) - require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers) + sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] }) + require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers) - blRange, _, err := blockRangeDAO.getBlockRange(tc.NetworkID(), accountAddress) - require.NoError(t, err) - require.NotNil(t, blRange.eth.FirstKnown) - require.NotNil(t, blRange.tokens.FirstKnown) - if testCase.fromBlock == 0 { - require.Equal(t, 0, blRange.tokens.FirstKnown.Cmp(zero)) + blRange, _, err := blockRangeDAO.getBlockRange(tc.NetworkID(), accountAddress) + require.NoError(t, err) + require.NotNil(t, blRange.eth.FirstKnown) + require.NotNil(t, blRange.tokens.FirstKnown) + if testCase.fromBlock == 0 { + require.Equal(t, 0, blRange.tokens.FirstKnown.Cmp(zero)) + } } - } + }) } }