This commit is contained in:
Igor Sirotin 2024-09-12 15:00:26 +01:00
parent 9038c66819
commit 3d4c3d2dd5
No known key found for this signature in database
GPG Key ID: 425E227CAAB81F95
5 changed files with 281 additions and 255 deletions

View File

@ -19,23 +19,15 @@ import (
"golang.org/x/crypto/ssh/terminal"
"github.com/ethereum/go-ethereum/log"
gethmetrics "github.com/ethereum/go-ethereum/metrics"
"github.com/status-im/status-go/api"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/cmd/statusd/server"
"github.com/status-im/status-go/common/dbsetup"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/metrics"
nodemetrics "github.com/status-im/status-go/metrics/node"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/profiling"
"github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/pushnotificationserver"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/walletdatabase"
)
@ -186,202 +178,202 @@ func main() {
}()
}
backend := api.NewGethStatusBackend()
if config.NodeKey == "" {
logger.Error("node key needs to be set if running a push notification server")
return
}
//backend := api.NewGethStatusBackend()
//if config.NodeKey == "" {
// logger.Error("node key needs to be set if running a push notification server")
// return
//}
//
//identity, err := crypto.HexToECDSA(config.NodeKey)
//if err != nil {
// logger.Error("node key is invalid", "error", err)
// return
//}
//
//// Generate installationID from public key, so it's always the same
//installationID, err := uuid.FromBytes(crypto.CompressPubkey(&identity.PublicKey)[:16])
//if err != nil {
// logger.Error("cannot create installation id", "error", err)
// return
//}
//
//if *seedPhrase != "" {
// // Remove data inside dir to avoid conflicts with existing data or account restoration fails
// if err := os.RemoveAll(config.DataDir); err != nil {
// logger.Error("failed to remove data dir", "error", err)
// return
// }
//
// if err := createDirsFromConfig(config); err != nil {
// logger.Error("failed to create directories", "error", err)
// return
// }
//
// request := requests.RestoreAccount{
// Mnemonic: *seedPhrase,
// FetchBackup: false,
// CreateAccount: requests.CreateAccount{
// DisplayName: "Account1",
// DeviceName: "StatusIM",
// Password: *password,
// CustomizationColor: "0x000000",
// RootDataDir: config.DataDir,
// APIConfig: &requests.APIConfig{
// ConnectorEnabled: config.ClusterConfig.Enabled,
// HTTPEnabled: config.HTTPEnabled,
// HTTPHost: config.HTTPHost,
// HTTPPort: config.HTTPPort,
// HTTPVirtualHosts: config.HTTPVirtualHosts,
// WSEnabled: config.WSEnabled,
// WSHost: config.WSHost,
// WSPort: config.WSPort,
// APIModules: config.APIModules,
// },
// NetworkID: &config.NetworkID,
// TestOverrideNetworks: config.Networks,
// },
// }
//
// api.OverrideApiConfigTest()
//
// _, err := backend.RestoreAccountAndLogin(&request)
// if err != nil {
// logger.Error("failed to import account", "error", err)
// return
// }
//
// appDB, walletDB, err := openDatabases(config.DataDir + "/" + installationID.String())
// if err != nil {
// log.Error("failed to open databases")
// return
// }
//
// options := []protocol.Option{
// protocol.WithDatabase(appDB),
// protocol.WithWalletDatabase(walletDB),
// protocol.WithTorrentConfig(&config.TorrentConfig),
// protocol.WithWalletConfig(&config.WalletConfig),
// protocol.WithAccountManager(backend.AccountManager()),
// }
//
// messenger, err := protocol.NewMessenger(
// config.Name,
// identity,
// gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
// installationID.String(),
// nil,
// config.Version,
// options...,
// )
//
// if err != nil {
// logger.Error("failed to create messenger", "error", err)
// return
// }
//
// _, err = messenger.Start()
// if err != nil {
// logger.Error("failed to start messenger", "error", err)
// return
// }
//
// interruptCh := haltOnInterruptSignal(backend.StatusNode())
// go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh)
//
//} else {
// appDB, walletDB, err := startNode(config, backend, installationID)
// if err != nil {
// logger.Error("failed to start node", "error", err)
// return
// }
//
// err = sdnotify.Ready()
// if err == sdnotify.ErrSdNotifyNoSocket {
// logger.Debug("sd_notify socket not available")
// } else if err != nil {
// logger.Warn("sd_notify READY call failed", "error", err)
// } else {
// // systemd aliveness notifications, affects only Linux
// go startSystemDWatchdog()
// }
//
// // handle interrupt signals
// interruptCh := haltOnInterruptSignal(backend.StatusNode())
//
// // Start collecting metrics. Metrics can be enabled by providing `-metrics` flag
// // or setting `gethmetrics.Enabled` to true during compilation time:
// // https://github.com/status-im/go-ethereum/pull/76.
// if *metricsEnabled || gethmetrics.Enabled {
// go startCollectingNodeMetrics(interruptCh, backend.StatusNode())
// go gethmetrics.CollectProcessMetrics(3 * time.Second)
// go metrics.NewMetricsServer(*metricsPort, gethmetrics.DefaultRegistry).Listen()
// }
//
// // Check if profiling shall be enabled.
// if *pprofEnabled {
// profiling.NewProfiler(*pprofPort).Go()
// }
//
// if config.PushNotificationServerConfig.Enabled {
// options := []protocol.Option{
// protocol.WithPushNotifications(),
// protocol.WithPushNotificationServerConfig(&pushnotificationserver.Config{
// Enabled: config.PushNotificationServerConfig.Enabled,
// Identity: config.PushNotificationServerConfig.Identity,
// GorushURL: config.PushNotificationServerConfig.GorushURL,
// }),
// protocol.WithDatabase(appDB),
// protocol.WithWalletDatabase(walletDB),
// protocol.WithTorrentConfig(&config.TorrentConfig),
// protocol.WithWalletConfig(&config.WalletConfig),
// protocol.WithAccountManager(backend.AccountManager()),
// }
//
// messenger, err := protocol.NewMessenger(
// config.Name,
// identity,
// gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
// installationID.String(),
// nil,
// config.Version,
// options...,
// )
// if err != nil {
// logger.Error("failed to create messenger", "error", err)
// return
// }
//
// err = messenger.InitInstallations()
// if err != nil {
// logger.Error("failed to init messenger installations", "error", err)
// return
// }
//
// err = messenger.InitFilters()
// if err != nil {
// logger.Error("failed to init messenger filters", "error", err)
// return
// }
//
// // This will start the push notification server as well as
// // the config is set to Enabled
// _, err = messenger.Start()
// if err != nil {
// logger.Error("failed to start messenger", "error", err)
// return
// }
// go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh)
// }
//}
identity, err := crypto.HexToECDSA(config.NodeKey)
if err != nil {
logger.Error("node key is invalid", "error", err)
return
}
// Generate installationID from public key, so it's always the same
installationID, err := uuid.FromBytes(crypto.CompressPubkey(&identity.PublicKey)[:16])
if err != nil {
logger.Error("cannot create installation id", "error", err)
return
}
if *seedPhrase != "" {
// Remove data inside dir to avoid conflicts with existing data or account restoration fails
if err := os.RemoveAll(config.DataDir); err != nil {
logger.Error("failed to remove data dir", "error", err)
return
}
if err := createDirsFromConfig(config); err != nil {
logger.Error("failed to create directories", "error", err)
return
}
request := requests.RestoreAccount{
Mnemonic: *seedPhrase,
FetchBackup: false,
CreateAccount: requests.CreateAccount{
DisplayName: "Account1",
DeviceName: "StatusIM",
Password: *password,
CustomizationColor: "0x000000",
RootDataDir: config.DataDir,
APIConfig: &requests.APIConfig{
ConnectorEnabled: config.ClusterConfig.Enabled,
HTTPEnabled: config.HTTPEnabled,
HTTPHost: config.HTTPHost,
HTTPPort: config.HTTPPort,
HTTPVirtualHosts: config.HTTPVirtualHosts,
WSEnabled: config.WSEnabled,
WSHost: config.WSHost,
WSPort: config.WSPort,
APIModules: config.APIModules,
},
NetworkID: &config.NetworkID,
TestOverrideNetworks: config.Networks,
},
}
api.OverrideApiConfigTest()
_, err := backend.RestoreAccountAndLogin(&request)
if err != nil {
logger.Error("failed to import account", "error", err)
return
}
appDB, walletDB, err := openDatabases(config.DataDir + "/" + installationID.String())
if err != nil {
log.Error("failed to open databases")
return
}
options := []protocol.Option{
protocol.WithDatabase(appDB),
protocol.WithWalletDatabase(walletDB),
protocol.WithTorrentConfig(&config.TorrentConfig),
protocol.WithWalletConfig(&config.WalletConfig),
protocol.WithAccountManager(backend.AccountManager()),
}
messenger, err := protocol.NewMessenger(
config.Name,
identity,
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
nil,
config.Version,
options...,
)
if err != nil {
logger.Error("failed to create messenger", "error", err)
return
}
_, err = messenger.Start()
if err != nil {
logger.Error("failed to start messenger", "error", err)
return
}
interruptCh := haltOnInterruptSignal(backend.StatusNode())
go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh)
} else {
appDB, walletDB, err := startNode(config, backend, installationID)
if err != nil {
logger.Error("failed to start node", "error", err)
return
}
err = sdnotify.Ready()
if err == sdnotify.ErrSdNotifyNoSocket {
logger.Debug("sd_notify socket not available")
} else if err != nil {
logger.Warn("sd_notify READY call failed", "error", err)
} else {
// systemd aliveness notifications, affects only Linux
go startSystemDWatchdog()
}
// handle interrupt signals
interruptCh := haltOnInterruptSignal(backend.StatusNode())
// Start collecting metrics. Metrics can be enabled by providing `-metrics` flag
// or setting `gethmetrics.Enabled` to true during compilation time:
// https://github.com/status-im/go-ethereum/pull/76.
if *metricsEnabled || gethmetrics.Enabled {
go startCollectingNodeMetrics(interruptCh, backend.StatusNode())
go gethmetrics.CollectProcessMetrics(3 * time.Second)
go metrics.NewMetricsServer(*metricsPort, gethmetrics.DefaultRegistry).Listen()
}
// Check if profiling shall be enabled.
if *pprofEnabled {
profiling.NewProfiler(*pprofPort).Go()
}
if config.PushNotificationServerConfig.Enabled {
options := []protocol.Option{
protocol.WithPushNotifications(),
protocol.WithPushNotificationServerConfig(&pushnotificationserver.Config{
Enabled: config.PushNotificationServerConfig.Enabled,
Identity: config.PushNotificationServerConfig.Identity,
GorushURL: config.PushNotificationServerConfig.GorushURL,
}),
protocol.WithDatabase(appDB),
protocol.WithWalletDatabase(walletDB),
protocol.WithTorrentConfig(&config.TorrentConfig),
protocol.WithWalletConfig(&config.WalletConfig),
protocol.WithAccountManager(backend.AccountManager()),
}
messenger, err := protocol.NewMessenger(
config.Name,
identity,
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
nil,
config.Version,
options...,
)
if err != nil {
logger.Error("failed to create messenger", "error", err)
return
}
err = messenger.InitInstallations()
if err != nil {
logger.Error("failed to init messenger installations", "error", err)
return
}
err = messenger.InitFilters()
if err != nil {
logger.Error("failed to init messenger filters", "error", err)
return
}
// This will start the push notification server as well as
// the config is set to Enabled
_, err = messenger.Start()
if err != nil {
logger.Error("failed to start messenger", "error", err)
return
}
go retrieveMessagesLoop(messenger, 300*time.Millisecond, interruptCh)
}
}
gethNode := backend.StatusNode().GethNode()
if gethNode != nil {
// wait till node has been stopped
gethNode.Wait()
if err := sdnotify.Stopping(); err != nil {
logger.Warn("sd_notify STOPPING call failed", "error", err)
}
}
//gethNode := backend.StatusNode().GethNode()
//if gethNode != nil {
// // wait till node has been stopped
// gethNode.Wait()
// if err := sdnotify.Stopping(); err != nil {
// logger.Warn("sd_notify STOPPING call failed", "error", err)
// }
//}
}
func getDefaultDataDir() string {

View File

@ -0,0 +1,6 @@
GET http://localhost:12345/InitializeApplication
Content-Type: application/json
{
"dataDir": "/Users/igorsirotin/Repositories/Status/status-desktop/Status/data"
}

View File

@ -0,0 +1,2 @@
WEBSOCKET ws://localhost:12345/signals
Content-Type: application/json

View File

@ -12,7 +12,9 @@ import (
"github.com/ethereum/go-ethereum/log"
statusgo "github.com/status-im/status-go/mobile"
"github.com/status-im/status-go/signal"
"io"
)
type Server struct {
@ -59,6 +61,10 @@ func (s *Server) Listen(address string) error {
}
http.HandleFunc("/signals", s.signals)
s.addStatusGoEndpoint("/InitializeApplication", statusgo.InitializeApplication)
s.addStatusGoEndpoint("/CreateAccountAndLogin", statusgo.CreateAccountAndLogin)
s.addStatusGoEndpoint("/RestoreAccountAndLogin", statusgo.RestoreAccountAndLogin)
s.addStatusGoEndpoint("/LoginAccount", statusgo.LoginAccount)
listener, err := net.Listen("tcp", address)
if err != nil {
@ -67,12 +73,12 @@ func (s *Server) Listen(address string) error {
s.address = listener.Addr().String()
go func() {
err := s.server.Serve(listener)
if !errors.Is(err, http.ErrServerClosed) {
log.Error("signals server closed with error: %w", err)
}
}()
//go func() {
err = s.server.Serve(listener)
if !errors.Is(err, http.ErrServerClosed) {
log.Error("signals server closed with error: %w", err)
}
//}()
return nil
}
@ -113,3 +119,20 @@ func (s *Server) signals(w http.ResponseWriter, r *http.Request) {
s.connections[connection] = struct{}{}
}
func (s *SignalsServer) addStatusGoEndpoint(endpoint string, handler func(string) string) {
http.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
request, err := io.ReadAll(r.Body)
if err != nil {
log.Error("failed to read request: %w", err)
return
}
response := handler(string(request))
_, err = w.Write([]byte(response))
if err != nil {
log.Error("failed to write response: %w", err)
}
})
}

View File

@ -1120,63 +1120,66 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo
func TestFindBlocksCommand(t *testing.T) {
for idx, testCase := range getCases() {
t.Log("case #", idx+1)
t.Run(fmt.Sprintf("case #%d", idx+1), func(t *testing.T) {
accountAddress := common.HexToAddress("0x1234")
rangeSize := 20
if testCase.rangeSize != 0 {
rangeSize = testCase.rangeSize
}
t.Log("case #", idx+1)
balances := map[common.Address][][]int{accountAddress: testCase.balanceChanges}
outgoingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC20Transfers}
incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC20Transfers}
outgoingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC1155SingleTransfers}
incomingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC1155SingleTransfers}
accountAddress := common.HexToAddress("0x1234")
rangeSize := 20
if testCase.rangeSize != 0 {
rangeSize = testCase.rangeSize
}
fbc, tc, blockChannel, blockRangeDAO := setupFindBlocksCommand(t, accountAddress, big.NewInt(testCase.fromBlock), big.NewInt(testCase.toBlock), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, outgoingERC1155SingleTransfers, incomingERC1155SingleTransfers)
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(fbc.Command())
balances := map[common.Address][][]int{accountAddress: testCase.balanceChanges}
outgoingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC20Transfers}
incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC20Transfers}
outgoingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.outgoingERC1155SingleTransfers}
incomingERC1155SingleTransfers := map[common.Address][]testERC20Transfer{accountAddress: testCase.incomingERC1155SingleTransfers}
foundBlocks := []*DBHeader{}
select {
case <-ctx.Done():
t.Log("ERROR")
case <-group.WaitAsync():
close(blockChannel)
for {
bloks, ok := <-blockChannel
if !ok {
break
fbc, tc, blockChannel, blockRangeDAO := setupFindBlocksCommand(t, accountAddress, big.NewInt(testCase.fromBlock), big.NewInt(testCase.toBlock), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, outgoingERC1155SingleTransfers, incomingERC1155SingleTransfers)
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(fbc.Command())
var foundBlocks []*DBHeader
select {
case <-ctx.Done():
t.Log("ERROR")
case <-group.WaitAsync():
close(blockChannel)
for {
blocks, ok := <-blockChannel
if !ok {
break
}
foundBlocks = append(foundBlocks, blocks...)
}
foundBlocks = append(foundBlocks, bloks...)
}
numbers := []int64{}
for _, block := range foundBlocks {
numbers = append(numbers, block.Number.Int64())
}
numbers := []int64{}
for _, block := range foundBlocks {
numbers = append(numbers, block.Number.Int64())
}
if tc.traceAPICalls {
tc.printCounter()
}
if tc.traceAPICalls {
tc.printCounter()
}
for name, cnt := range testCase.expectedCalls {
require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name)
}
for name, cnt := range testCase.expectedCalls {
require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name)
}
sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] })
require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers)
sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] })
require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers)
blRange, _, err := blockRangeDAO.getBlockRange(tc.NetworkID(), accountAddress)
require.NoError(t, err)
require.NotNil(t, blRange.eth.FirstKnown)
require.NotNil(t, blRange.tokens.FirstKnown)
if testCase.fromBlock == 0 {
require.Equal(t, 0, blRange.tokens.FirstKnown.Cmp(zero))
blRange, _, err := blockRangeDAO.getBlockRange(tc.NetworkID(), accountAddress)
require.NoError(t, err)
require.NotNil(t, blRange.eth.FirstKnown)
require.NotNil(t, blRange.tokens.FirstKnown)
if testCase.fromBlock == 0 {
require.Equal(t, 0, blRange.tokens.FirstKnown.Cmp(zero))
}
}
}
})
}
}