From 2d964bfe9f9554fdb34c6c19095f3c5e88c527e6 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 9 Feb 2018 15:37:56 +0200 Subject: [PATCH] Remove async operations from node manager (#584) The main goal of this change is to remove async operations from node manager. Additionally all of the signals from node manager are moved to status backend. All of the async operation now will have the following behaviour: - If node in the correct state exit immediatly without error - If node not in the correct state exit immediatly with error - In all other cases spawn a goroutine with wanted operation - All the progress regarding that operation will be reported by using signals - Signals should be handled in once place, which is StatusBackend There are 2 potentially breaking changes: - Empty event field will be ommited when Envelope is sent to a client - All errors will be delivered to a client as an Envelope, previously some errors (NodeExists, NoRunningNode) were delivered synchronously Signed-off-by: Dmitry Shulyak --- cmd/statusd/debug/commands.go | 3 +- cmd/statusd/main.go | 24 +- cmd/statusd/sync.go | 5 +- geth/api/api.go | 44 +--- geth/api/backend.go | 152 +++++------ geth/api/utils.go | 11 + geth/common/types.go | 15 +- geth/common/types_mock.go | 50 +--- geth/node/manager.go | 364 +++++++------------------- geth/node/utils.go | 25 -- lib/library.go | 12 +- t/e2e/accounts/accounts_test.go | 8 +- t/e2e/api/api_test.go | 78 +++++- t/e2e/api/backend_test.go | 47 ++-- t/e2e/node/manager_test.go | 168 ++---------- t/e2e/rpc/rpc_test.go | 20 +- t/e2e/suites.go | 24 +- t/e2e/whisper/whisper_mailbox_test.go | 16 +- t/utils/utils.go | 2 +- 19 files changed, 329 insertions(+), 739 deletions(-) create mode 100644 geth/api/utils.go delete mode 100644 geth/node/utils.go diff --git a/cmd/statusd/debug/commands.go b/cmd/statusd/debug/commands.go index 27980392f..b9affa10e 100644 --- a/cmd/statusd/debug/commands.go +++ b/cmd/statusd/debug/commands.go @@ -135,8 +135,7 @@ func (cs *commandSet) StopNode() error { // ResetChainData removes chain data from data directory. func (cs *commandSet) ResetChainData() error { - _, err := cs.statusAPI.ResetChainDataAsync() - return err + return cs.statusAPI.ResetChainData() } // CallRPC calls status node via RPC. diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index 7006ebe5d..4fda137c4 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -10,7 +10,6 @@ import ( "os/signal" "runtime" "strings" - "time" "github.com/status-im/status-go/cmd/statusd/debug" "github.com/status-im/status-go/geth/api" @@ -86,7 +85,7 @@ func main() { } backend := api.NewStatusBackend() - started, err := backend.StartNode(config) + err = backend.StartNode(config) if err != nil { log.Fatalf("Node start failed: %v", err) return @@ -94,10 +93,6 @@ func main() { // handle interrupt signals interruptCh := haltOnInterruptSignal(backend.NodeManager()) - - // wait till node is started - <-started - // Check if debugging CLI connection shall be enabled. if *cliEnabled { err := startDebug(backend) @@ -282,30 +277,17 @@ Options: // if the node can not be stopped. func haltOnInterruptSignal(nodeManager common.NodeManager) <-chan struct{} { interruptCh := make(chan struct{}) - go func() { signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt) defer signal.Stop(signalCh) <-signalCh - close(interruptCh) - log.Println("Got interrupt, shutting down...") - - nodeStopped, err := nodeManager.StopNode() - if err != nil { - log.Printf("Failed to stop node: %v", err) - os.Exit(1) - } - - select { - case <-nodeStopped: - case <-time.After(time.Second * 5): - log.Printf("Stopping node timed out") + if err := nodeManager.StopNode(); err != nil { + log.Printf("Failed to stop node: %v", err.Error()) os.Exit(1) } }() - return interruptCh } diff --git a/cmd/statusd/sync.go b/cmd/statusd/sync.go index 06508ddab..09c4f1678 100644 --- a/cmd/statusd/sync.go +++ b/cmd/statusd/sync.go @@ -46,12 +46,9 @@ func syncAndStopNode(interruptCh <-chan struct{}, nodeManager common.NodeManager return -1 } - done, err := nodeManager.StopNode() - if err != nil { + if err := nodeManager.StopNode(); err != nil { log.Printf("syncAndStopNode: failed to stop the node: %v", err) return 1 } - <-done - return } diff --git a/geth/api/api.go b/geth/api/api.go index 4234ea4c9..f7313c75c 100644 --- a/geth/api/api.go +++ b/geth/api/api.go @@ -52,65 +52,45 @@ func (api *StatusAPI) TxQueueManager() *transactions.Manager { // StartNode start Status node, fails if node is already started func (api *StatusAPI) StartNode(config *params.NodeConfig) error { - nodeStarted, err := api.b.StartNode(config) - if err != nil { - return err - } - <-nodeStarted - return nil + return api.b.StartNode(config) } // StartNodeAsync start Status node, fails if node is already started // Returns immediately w/o waiting for node to start (see node.ready) -func (api *StatusAPI) StartNodeAsync(config *params.NodeConfig) (<-chan struct{}, error) { - return api.b.StartNode(config) +func (api *StatusAPI) StartNodeAsync(config *params.NodeConfig) <-chan error { + return runAsync(func() error { return api.StartNode(config) }) } // StopNode stop Status node. Stopped node cannot be resumed. func (api *StatusAPI) StopNode() error { - nodeStopped, err := api.b.StopNode() - if err != nil { - return err - } - <-nodeStopped - return nil + return api.b.StopNode() } // StopNodeAsync stop Status node. Stopped node cannot be resumed. // Returns immediately, w/o waiting for node to stop (see node.stopped) -func (api *StatusAPI) StopNodeAsync() (<-chan struct{}, error) { - return api.b.StopNode() +func (api *StatusAPI) StopNodeAsync() <-chan error { + return runAsync(api.StopNode) } // RestartNode restart running Status node, fails if node is not running func (api *StatusAPI) RestartNode() error { - nodeStarted, err := api.b.RestartNode() - if err != nil { - return err - } - <-nodeStarted // do not return up until backend is ready - return nil + return api.b.RestartNode() } // RestartNodeAsync restart running Status node, in async manner -func (api *StatusAPI) RestartNodeAsync() (<-chan struct{}, error) { - return api.b.RestartNode() +func (api *StatusAPI) RestartNodeAsync() <-chan error { + return runAsync(api.RestartNode) } // ResetChainData remove chain data from data directory. // Node is stopped, and new node is started, with clean data directory. func (api *StatusAPI) ResetChainData() error { - nodeStarted, err := api.b.ResetChainData() - if err != nil { - return err - } - <-nodeStarted // do not return up until backend is ready - return nil + return api.b.ResetChainData() } // ResetChainDataAsync remove chain data from data directory, in async manner -func (api *StatusAPI) ResetChainDataAsync() (<-chan struct{}, error) { - return api.b.ResetChainData() +func (api *StatusAPI) ResetChainDataAsync() <-chan error { + return runAsync(api.ResetChainData) } // CallRPC executes RPC request on node's in-proc RPC server diff --git a/geth/api/backend.go b/geth/api/backend.go index ad5286936..f395ec7ec 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -2,6 +2,7 @@ package api import ( "context" + "fmt" "sync" gethcommon "github.com/ethereum/go-ethereum/common" @@ -23,9 +24,8 @@ const ( // StatusBackend implements Status.im service type StatusBackend struct { - sync.Mutex - nodeReady chan struct{} // channel to wait for when node is fully ready - nodeManager common.NodeManager + mu sync.Mutex + nodeManager *node.NodeManager accountManager common.AccountManager txQueueManager *transactions.Manager jailManager common.JailManager @@ -77,120 +77,99 @@ func (m *StatusBackend) IsNodeRunning() bool { } // StartNode start Status node, fails if node is already started -func (m *StatusBackend) StartNode(config *params.NodeConfig) (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if m.nodeReady != nil { - return nil, node.ErrNodeExists - } - - nodeStarted, err := m.nodeManager.StartNode(config) - if err != nil { - return nil, err - } - - m.nodeReady = make(chan struct{}, 1) - go m.onNodeStart(nodeStarted, m.nodeReady) // waits on nodeStarted, writes to backendReady - - return m.nodeReady, err +func (m *StatusBackend) StartNode(config *params.NodeConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.startNode(config) } -// onNodeStart does everything required to prepare backend -func (m *StatusBackend) onNodeStart(nodeStarted <-chan struct{}, backendReady chan struct{}) { - <-nodeStarted - +func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("node crashed on start: %v", err) + } + }() + err = m.nodeManager.StartNode(config) + if err != nil { + switch err.(type) { + case node.RPCClientError: + err = node.ErrRPCClient + case node.EthNodeError: + err = fmt.Errorf("%v: %v", node.ErrNodeStartFailure, err) + } + signal.Send(signal.Envelope{ + Type: signal.EventNodeCrashed, + Event: signal.NodeCrashEvent{ + Error: err, + }, + }) + return err + } + signal.Send(signal.Envelope{Type: signal.EventNodeStarted}) // tx queue manager should be started after node is started, it depends // on rpc client being created m.txQueueManager.Start() - if err := m.registerHandlers(); err != nil { log.Error("Handler registration failed", "err", err) } - if err := m.accountManager.ReSelectAccount(); err != nil { log.Error("Reselect account failed", "err", err) } log.Info("Account reselected") - - close(backendReady) - signal.Send(signal.Envelope{ - Type: signal.EventNodeReady, - Event: struct{}{}, - }) + signal.Send(signal.Envelope{Type: signal.EventNodeReady}) + return nil } // StopNode stop Status node. Stopped node cannot be resumed. -func (m *StatusBackend) StopNode() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() +func (m *StatusBackend) StopNode() error { + m.mu.Lock() + defer m.mu.Unlock() + return m.stopNode() +} - if m.nodeReady == nil { - return nil, node.ErrNoRunningNode +func (m *StatusBackend) stopNode() error { + if !m.IsNodeRunning() { + return node.ErrNoRunningNode } - <-m.nodeReady - m.txQueueManager.Stop() m.jailManager.Stop() - - nodeStopped, err := m.nodeManager.StopNode() - if err != nil { - return nil, err - } - - backendStopped := make(chan struct{}, 1) - go func() { - <-nodeStopped - m.Lock() - m.nodeReady = nil - m.Unlock() - close(backendStopped) - }() - - return backendStopped, nil + return m.nodeManager.StopNode() } // RestartNode restart running Status node, fails if node is not running -func (m *StatusBackend) RestartNode() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if m.nodeReady == nil { - return nil, node.ErrNoRunningNode +func (m *StatusBackend) RestartNode() error { + if !m.IsNodeRunning() { + return node.ErrNoRunningNode } - <-m.nodeReady - - nodeRestarted, err := m.nodeManager.RestartNode() + config, err := m.nodeManager.NodeConfig() if err != nil { - return nil, err + return err } - - m.nodeReady = make(chan struct{}, 1) - go m.onNodeStart(nodeRestarted, m.nodeReady) // waits on nodeRestarted, writes to backendReady - - return m.nodeReady, err + newcfg := *config + if err := m.stopNode(); err != nil { + return err + } + return m.startNode(&newcfg) } // ResetChainData remove chain data from data directory. // Node is stopped, and new node is started, with clean data directory. -func (m *StatusBackend) ResetChainData() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if m.nodeReady == nil { - return nil, node.ErrNoRunningNode - } - <-m.nodeReady - - nodeReset, err := m.nodeManager.ResetChainData() +func (m *StatusBackend) ResetChainData() error { + m.mu.Lock() + defer m.mu.Unlock() + config, err := m.nodeManager.NodeConfig() if err != nil { - return nil, err + return err } - - m.nodeReady = make(chan struct{}, 1) - go m.onNodeStart(nodeReset, m.nodeReady) // waits on nodeReset, writes to backendReady - - return m.nodeReady, err + newcfg := *config + if err := m.stopNode(); err != nil { + return err + } + if err := m.ResetChainData(); err != nil { + return err + } + signal.Send(signal.Envelope{Type: signal.EventChainDataRemoved}) + return m.startNode(&newcfg) } // CallRPC executes RPC request on node's in-proc RPC server @@ -241,7 +220,6 @@ func (m *StatusBackend) registerHandlers() error { if rpcClient == nil { return node.ErrRPCClient } - rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) return nil diff --git a/geth/api/utils.go b/geth/api/utils.go new file mode 100644 index 000000000..c61fb137d --- /dev/null +++ b/geth/api/utils.go @@ -0,0 +1,11 @@ +package api + +func runAsync(f func() error) <-chan error { + resp := make(chan error, 1) + go func() { + err := f() + resp <- err + close(resp) + }() + return resp +} diff --git a/geth/common/types.go b/geth/common/types.go index 00f0823ef..f1b80245b 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -47,21 +47,14 @@ func (k *SelectedExtKey) Hex() string { // NodeManager defines expected methods for managing Status node type NodeManager interface { // StartNode start Status node, fails if node is already started - StartNode(config *params.NodeConfig) (<-chan struct{}, error) - - // StopNode stop the running Status node. - // Stopped node cannot be resumed, one starts a new node instead. - StopNode() (<-chan struct{}, error) + StartNode(config *params.NodeConfig) error // EnsureSync waits until blockchain is synchronized. EnsureSync(ctx context.Context) error - // RestartNode restart running Status node, fails if node is not running - RestartNode() (<-chan struct{}, error) - - // ResetChainData remove chain data from data directory. - // Node is stopped, and new node is started, with clean data directory. - ResetChainData() (<-chan struct{}, error) + // StopNode stop the running Status node. + // Stopped node cannot be resumed, one starts a new node instead. + StopNode() error // IsNodeRunning confirm that node is running IsNodeRunning() bool diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 143d3c3ef..9f8fede64 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -43,11 +43,10 @@ func (m *MockNodeManager) EXPECT() *MockNodeManagerMockRecorder { } // StartNode mocks base method -func (m *MockNodeManager) StartNode(config *params.NodeConfig) (<-chan struct{}, error) { +func (m *MockNodeManager) StartNode(config *params.NodeConfig) error { ret := m.ctrl.Call(m, "StartNode", config) - ret0, _ := ret[0].(<-chan struct{}) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // StartNode indicates an expected call of StartNode @@ -55,19 +54,6 @@ func (mr *MockNodeManagerMockRecorder) StartNode(config interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartNode", reflect.TypeOf((*MockNodeManager)(nil).StartNode), config) } -// StopNode mocks base method -func (m *MockNodeManager) StopNode() (<-chan struct{}, error) { - ret := m.ctrl.Call(m, "StopNode") - ret0, _ := ret[0].(<-chan struct{}) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// StopNode indicates an expected call of StopNode -func (mr *MockNodeManagerMockRecorder) StopNode() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopNode", reflect.TypeOf((*MockNodeManager)(nil).StopNode)) -} - // EnsureSync mocks base method func (m *MockNodeManager) EnsureSync(ctx context.Context) error { ret := m.ctrl.Call(m, "EnsureSync", ctx) @@ -80,30 +66,16 @@ func (mr *MockNodeManagerMockRecorder) EnsureSync(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureSync", reflect.TypeOf((*MockNodeManager)(nil).EnsureSync), ctx) } -// RestartNode mocks base method -func (m *MockNodeManager) RestartNode() (<-chan struct{}, error) { - ret := m.ctrl.Call(m, "RestartNode") - ret0, _ := ret[0].(<-chan struct{}) - ret1, _ := ret[1].(error) - return ret0, ret1 +// StopNode mocks base method +func (m *MockNodeManager) StopNode() error { + ret := m.ctrl.Call(m, "StopNode") + ret0, _ := ret[0].(error) + return ret0 } -// RestartNode indicates an expected call of RestartNode -func (mr *MockNodeManagerMockRecorder) RestartNode() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RestartNode", reflect.TypeOf((*MockNodeManager)(nil).RestartNode)) -} - -// ResetChainData mocks base method -func (m *MockNodeManager) ResetChainData() (<-chan struct{}, error) { - ret := m.ctrl.Call(m, "ResetChainData") - ret0, _ := ret[0].(<-chan struct{}) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ResetChainData indicates an expected call of ResetChainData -func (mr *MockNodeManagerMockRecorder) ResetChainData() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetChainData", reflect.TypeOf((*MockNodeManager)(nil).ResetChainData)) +// StopNode indicates an expected call of StopNode +func (mr *MockNodeManagerMockRecorder) StopNode() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopNode", reflect.TypeOf((*MockNodeManager)(nil).StopNode)) } // IsNodeRunning mocks base method diff --git a/geth/node/manager.go b/geth/node/manager.go index 4d247363b..ab1dc1495 100644 --- a/geth/node/manager.go +++ b/geth/node/manager.go @@ -19,7 +19,6 @@ import ( "github.com/status-im/status-go/geth/mailservice" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/rpc" - "github.com/status-im/status-go/geth/signal" ) // errors @@ -34,15 +33,20 @@ var ( ErrRPCClient = errors.New("failed to init RPC client") ) +// RPCClientError reported when rpc client is initialized. +type RPCClientError error + +// EthNodeError is reported when node crashed on start up. +type EthNodeError error + // NodeManager manages Status node (which abstracts contained geth node) // nolint: golint // should be fixed at https://github.com/status-im/status-go/issues/200 type NodeManager struct { - sync.RWMutex - config *params.NodeConfig // Status node configuration - node *node.Node // reference to Geth P2P stack/node - nodeStarted chan struct{} // channel to wait for start up notifications - nodeStopped chan struct{} // channel to wait for termination notifications + mu sync.RWMutex + config *params.NodeConfig // Status node configuration + node *node.Node // reference to Geth P2P stack/node + whisperService *whisper.Whisper // reference to Whisper service lesService *les.LightEthereum // reference to LES service rpcClient *rpc.Client // reference to RPC client @@ -54,201 +58,131 @@ func NewNodeManager() *NodeManager { } // StartNode start Status node, fails if node is already started -func (m *NodeManager) StartNode(config *params.NodeConfig) (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - +func (m *NodeManager) StartNode(config *params.NodeConfig) error { + m.mu.Lock() + defer m.mu.Unlock() return m.startNode(config) } // startNode start Status node, fails if node is already started -func (m *NodeManager) startNode(config *params.NodeConfig) (<-chan struct{}, error) { - if m.node != nil || m.nodeStarted != nil { - return nil, ErrNodeExists +func (m *NodeManager) startNode(config *params.NodeConfig) error { + if err := m.isNodeAvailable(); err == nil { + return ErrNodeExists } - m.initLog(config) ethNode, err := MakeNode(config) if err != nil { - return nil, err + return err } + m.node = ethNode + m.config = config // activate MailService required for Offline Inboxing if err := ethNode.Register(func(_ *node.ServiceContext) (node.Service, error) { return mailservice.New(m), nil }); err != nil { - return nil, err + return err } - m.nodeStarted = make(chan struct{}, 1) - + // start underlying node + if err := ethNode.Start(); err != nil { + return EthNodeError(err) + } + // init RPC client for this node + localRPCClient, err := m.node.Attach() + if err == nil { + m.rpcClient, err = rpc.NewClient(localRPCClient, m.config.UpstreamConfig) + } + if err != nil { + log.Error("Failed to create an RPC client", "error", err) + return RPCClientError(err) + } + // populate static peers exits when node stopped go func() { - defer HaltOnPanic() - - // start underlying node - if startErr := ethNode.Start(); startErr != nil { - close(m.nodeStarted) - m.Lock() - m.nodeStarted = nil - m.Unlock() - signal.Send(signal.Envelope{ - Type: signal.EventNodeCrashed, - Event: signal.NodeCrashEvent{ - Error: fmt.Errorf("%v: %v", ErrNodeStartFailure, startErr), - }, - }) - return + if err := m.PopulateStaticPeers(); err != nil { + log.Error("Static peers population", "error", err) } - - m.Lock() - m.node = ethNode - m.nodeStopped = make(chan struct{}, 1) - m.config = config - - // init RPC client for this node - localRPCClient, errRPC := m.node.Attach() - if errRPC == nil { - m.rpcClient, errRPC = rpc.NewClient(localRPCClient, m.config.UpstreamConfig) - } - - if errRPC != nil { - log.Error("Failed to create an RPC client", "error", errRPC) - - m.Unlock() - signal.Send(signal.Envelope{ - Type: signal.EventNodeCrashed, - Event: signal.NodeCrashEvent{ - Error: ErrRPCClient, - }, - }) - return - } - - m.Unlock() - - // underlying node is started, every method can use it, we use it immediately - go func() { - if err := m.PopulateStaticPeers(); err != nil { - log.Error("Static peers population", "error", err) - } - }() - - // notify all subscribers that Status node is started - close(m.nodeStarted) - signal.Send(signal.Envelope{ - Type: signal.EventNodeStarted, - Event: struct{}{}, - }) - - // wait up until underlying node is stopped - m.node.Wait() - - // notify m.Stop() that node has been stopped - close(m.nodeStopped) - log.Info("Node is stopped") }() - - return m.nodeStarted, nil + return nil } // StopNode stop Status node. Stopped node cannot be resumed. -func (m *NodeManager) StopNode() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if err := m.isNodeAvailable(); err != nil { - return nil, err - } - if m.nodeStopped == nil { - return nil, ErrNoRunningNode - } - - <-m.nodeStarted // make sure you operate on fully started node - +func (m *NodeManager) StopNode() error { + m.mu.Lock() + defer m.mu.Unlock() return m.stopNode() } // stopNode stop Status node. Stopped node cannot be resumed. -func (m *NodeManager) stopNode() (<-chan struct{}, error) { - // now attempt to stop - if err := m.node.Stop(); err != nil { - return nil, err +func (m *NodeManager) stopNode() error { + if err := m.isNodeAvailable(); err != nil { + return err } + if err := m.node.Stop(); err != nil { + return err + } + m.node = nil + m.config = nil + m.lesService = nil + m.whisperService = nil + m.rpcClient = nil + return nil +} - nodeStopped := make(chan struct{}, 1) - go func() { - <-m.nodeStopped // Status node is stopped (code after Wait() is executed) - log.Info("Ready to reset node") - - // reset node params - m.Lock() - m.config = nil - m.lesService = nil - m.whisperService = nil - m.rpcClient = nil - m.nodeStarted = nil - m.node = nil - m.Unlock() - - close(nodeStopped) // Status node is stopped, and we can create another - log.Info("Node manager resets node params") - - // notify application that it can send more requests now - signal.Send(signal.Envelope{ - Type: signal.EventNodeStopped, - Event: struct{}{}, - }) - log.Info("Node manager notifed app, that node has stopped") - }() - - return nodeStopped, nil +// ResetChainData removes chain data if node is not running. +func (m *NodeManager) ResetChainData() error { + if !m.IsNodeRunning() { + return ErrNoRunningNode + } + m.mu.Lock() + defer m.mu.Unlock() + chainDataDir := filepath.Join(m.config.DataDir, m.config.Name, "lightchaindata") + if _, err := os.Stat(chainDataDir); os.IsNotExist(err) { + // is it really an error, if we want to remove it as next step? + return err + } + err := os.RemoveAll(chainDataDir) + if err == nil { + log.Info("Chain data has been removed", "dir", chainDataDir) + } + return err } // IsNodeRunning confirm that node is running func (m *NodeManager) IsNodeRunning() bool { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return false } - - <-m.nodeStarted - return true } // Node returns underlying Status node func (m *NodeManager) Node() (*node.Node, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - return m.node, nil } // PopulateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster func (m *NodeManager) PopulateStaticPeers() error { - m.RLock() - defer m.RUnlock() - - if err := m.isNodeAvailable(); err != nil { - return err - } - - <-m.nodeStarted - + m.mu.RLock() + defer m.mu.RUnlock() return m.populateStaticPeers() } // populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster func (m *NodeManager) populateStaticPeers() error { + if err := m.isNodeAvailable(); err != nil { + return err + } if !m.config.BootClusterConfig.Enabled { log.Info("Boot cluster is disabled") return nil @@ -268,211 +202,107 @@ func (m *NodeManager) populateStaticPeers() error { // AddPeer adds new static peer node func (m *NodeManager) AddPeer(url string) error { - m.RLock() - defer m.RUnlock() - + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return err } - - <-m.nodeStarted - return m.addPeer(url) } // addPeer adds new static peer node func (m *NodeManager) addPeer(url string) error { - server := m.node.Server() - if server == nil { - return ErrNoRunningNode - } - // Try to add the url as a static peer and return parsedNode, err := discover.ParseNode(url) if err != nil { return err } - server.AddPeer(parsedNode) - + m.node.Server().AddPeer(parsedNode) return nil } // PeerCount returns the number of connected peers. func (m *NodeManager) PeerCount() int { - if m.node == nil || m.node.Server() == nil { + if !m.IsNodeRunning() { return 0 } return m.node.Server().PeerCount() } -// ResetChainData remove chain data from data directory. -// Node is stopped, and new node is started, with clean data directory. -func (m *NodeManager) ResetChainData() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if err := m.isNodeAvailable(); err != nil { - return nil, err - } - - <-m.nodeStarted - - return m.resetChainData() -} - -// resetChainData remove chain data from data directory. -// Node is stopped, and new node is started, with clean data directory. -func (m *NodeManager) resetChainData() (<-chan struct{}, error) { - prevConfig := *m.config - nodeStopped, err := m.stopNode() - if err != nil { - return nil, err - } - - m.Unlock() - <-nodeStopped - m.Lock() - - chainDataDir := filepath.Join(prevConfig.DataDir, prevConfig.Name, "lightchaindata") - if _, err := os.Stat(chainDataDir); os.IsNotExist(err) { - return nil, err - } - if err := os.RemoveAll(chainDataDir); err != nil { - return nil, err - } - // send signal up to native app - signal.Send(signal.Envelope{ - Type: signal.EventChainDataRemoved, - Event: struct{}{}, - }) - log.Info("Chain data has been removed", "dir", chainDataDir) - - return m.startNode(&prevConfig) -} - -// RestartNode restart running Status node, fails if node is not running -func (m *NodeManager) RestartNode() (<-chan struct{}, error) { - m.Lock() - defer m.Unlock() - - if err := m.isNodeAvailable(); err != nil { - return nil, err - } - - <-m.nodeStarted - - return m.restartNode() -} - -// restartNode restart running Status node, fails if node is not running -func (m *NodeManager) restartNode() (<-chan struct{}, error) { - prevConfig := *m.config - nodeStopped, err := m.stopNode() - if err != nil { - return nil, err - } - - m.Unlock() - <-nodeStopped - m.Lock() - - return m.startNode(&prevConfig) -} - // NodeConfig exposes reference to running node's configuration func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - return m.config, nil } // LightEthereumService exposes reference to LES service running on top of the node func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - if m.lesService == nil { if err := m.node.Service(&m.lesService); err != nil { log.Warn("Cannot obtain LES service", "error", err) return nil, ErrInvalidLightEthereumService } } - if m.lesService == nil { return nil, ErrInvalidLightEthereumService } - return m.lesService, nil } // WhisperService exposes reference to Whisper service running on top of the node func (m *NodeManager) WhisperService() (*whisper.Whisper, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - if m.whisperService == nil { if err := m.node.Service(&m.whisperService); err != nil { log.Warn("Cannot obtain whisper service", "error", err) return nil, ErrInvalidWhisperService } } - if m.whisperService == nil { return nil, ErrInvalidWhisperService } - return m.whisperService, nil } // AccountManager exposes reference to node's accounts manager func (m *NodeManager) AccountManager() (*accounts.Manager, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - accountManager := m.node.AccountManager() if accountManager == nil { return nil, ErrInvalidAccountManager } - return accountManager, nil } // AccountKeyStore exposes reference to accounts key store func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() if err := m.isNodeAvailable(); err != nil { return nil, err } - - <-m.nodeStarted - accountManager := m.node.AccountManager() if accountManager == nil { return nil, ErrInvalidAccountManager @@ -493,9 +323,8 @@ func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) { // RPCClient exposes reference to RPC client connected to the running node. func (m *NodeManager) RPCClient() *rpc.Client { - m.Lock() - defer m.Unlock() - + m.mu.Lock() + defer m.mu.Unlock() return m.rpcClient } @@ -514,10 +343,9 @@ func (m *NodeManager) initLog(config *params.NodeConfig) { // isNodeAvailable check if we have a node running and make sure is fully started func (m *NodeManager) isNodeAvailable() error { - if m.nodeStarted == nil || m.node == nil { + if m.node == nil || m.node.Server() == nil { return ErrNoRunningNode } - return nil } diff --git a/geth/node/utils.go b/geth/node/utils.go deleted file mode 100644 index 5923d6a1d..000000000 --- a/geth/node/utils.go +++ /dev/null @@ -1,25 +0,0 @@ -package node - -import ( - "fmt" - - "github.com/status-im/status-go/geth/common" - "github.com/status-im/status-go/geth/signal" -) - -// HaltOnPanic recovers from panic, logs issue, sends upward notification, and exits -func HaltOnPanic() { - if r := recover(); r != nil { - err := fmt.Errorf("%v: %v", ErrNodeRunFailure, r) - - // send signal up to native app - signal.Send(signal.Envelope{ - Type: signal.EventNodeCrashed, - Event: signal.NodeCrashEvent{ - Error: err, - }, - }) - - common.Fatalf(ErrNodeRunFailure, r) // os.exit(1) is called internally - } -} diff --git a/lib/library.go b/lib/library.go index 3c6ccf4ec..3724396f4 100644 --- a/lib/library.go +++ b/lib/library.go @@ -38,15 +38,15 @@ func StartNode(configJSON *C.char) *C.char { return makeJSONResponse(err) } - _, err = statusAPI.StartNodeAsync(config) - return makeJSONResponse(err) + statusAPI.StartNodeAsync(config) + return makeJSONResponse(nil) } //StopNode - stop status node //export StopNode func StopNode() *C.char { - _, err := statusAPI.StopNodeAsync() - return makeJSONResponse(err) + statusAPI.StopNodeAsync() + return makeJSONResponse(nil) } //ValidateNodeConfig validates config for status node @@ -95,8 +95,8 @@ func ValidateNodeConfig(configJSON *C.char) *C.char { //ResetChainData remove chain data from data directory //export ResetChainData func ResetChainData() *C.char { - _, err := statusAPI.ResetChainDataAsync() - return makeJSONResponse(err) + statusAPI.ResetChainDataAsync() + return makeJSONResponse(nil) } //CallRPC calls status node via rpc diff --git a/t/e2e/accounts/accounts_test.go b/t/e2e/accounts/accounts_test.go index 4aa673ee9..ddf34c2b5 100644 --- a/t/e2e/accounts/accounts_test.go +++ b/t/e2e/accounts/accounts_test.go @@ -254,9 +254,7 @@ func (s *AccountsTestSuite) TestSelectedAccountOnRestart() { nodeConfig, err := s.Backend.NodeManager().NodeConfig() s.NoError(err) preservedNodeConfig := *nodeConfig - nodeStoped, err := s.Backend.StopNode() - s.NoError(err) - <-nodeStoped + s.NoError(s.Backend.StopNode()) // make sure that account is still selected selectedAccount, err = s.Backend.AccountManager().SelectedAccount() @@ -265,9 +263,7 @@ func (s *AccountsTestSuite) TestSelectedAccountOnRestart() { s.Equal(selectedAccount.Address.Hex(), address2, "incorrect address selected") // resume node - nodeStarted, err := s.Backend.StartNode(&preservedNodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.Backend.StartNode(&preservedNodeConfig)) // re-check selected account (account2 MUST be selected) selectedAccount, err = s.Backend.AccountManager().SelectedAccount() diff --git a/t/e2e/api/api_test.go b/t/e2e/api/api_test.go index 02b2c08e8..a3acea723 100644 --- a/t/e2e/api/api_test.go +++ b/t/e2e/api/api_test.go @@ -1,6 +1,7 @@ package api_test import ( + "encoding/json" "io/ioutil" "math/rand" "os" @@ -10,7 +11,9 @@ import ( "github.com/status-im/status-go/geth/api" "github.com/status-im/status-go/geth/log" + "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/params" + "github.com/status-im/status-go/geth/signal" e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -29,6 +32,12 @@ type APITestSuite struct { api *api.StatusAPI } +func (s *APITestSuite) ensureNodeStopped() { + if err := s.api.StopNode(); err != node.ErrNoRunningNode && err != nil { + s.NoError(err, "unexpected error") + } +} + func (s *APITestSuite) SetupTest() { s.api = api.NewStatusAPI() s.NotNil(s.api) @@ -67,29 +76,29 @@ func (s *APITestSuite) TestRaceConditions() { var funcsToTest = []func(*params.NodeConfig){ func(config *params.NodeConfig) { log.Info("StartNodeAsync()") - _, err := s.api.StartNodeAsync(config) - s.T().Logf("StartNodeAsync() for network: %d, error: %v", config.NetworkID, err) + s.api.StartNodeAsync(config) + s.T().Logf("StartNodeAsync() for network: %d", config.NetworkID) progress <- struct{}{} }, func(config *params.NodeConfig) { log.Info("StopNodeAsync()") - _, err := s.api.StopNodeAsync() - s.T().Logf("StopNodeAsync(), error: %v", err) + s.api.StopNodeAsync() + s.T().Logf("StopNodeAsync()") progress <- struct{}{} }, func(config *params.NodeConfig) { log.Info("RestartNodeAsync()") - _, err := s.api.RestartNodeAsync() - s.T().Logf("RestartNodeAsync(), error: %v", err) + s.api.RestartNodeAsync() + s.T().Logf("RestartNodeAsync()") progress <- struct{}{} }, // TODO(adam): quarantined until it uses a different datadir // as otherwise it wipes out cached blockchain data. // func(config *params.NodeConfig) { - // log.Info("ResetChainDataAsync()") - // _, err := s.api.ResetChainDataAsync() - // s.T().Logf("ResetChainDataAsync(), error: %v", err) - // progress <- struct{}{} + // log.Info("ResetChainDataAsync()") + // _, err := s.api.ResetChainDataAsync() + // s.T().Logf("ResetChainDataAsync(), error: %v", err) + // progress <- struct{}{} // }, } @@ -117,7 +126,7 @@ func (s *APITestSuite) TestRaceConditions() { time.Sleep(2 * time.Second) // so that we see some logs // just in case we have a node running - s.api.StopNode() //nolint: errcheck + s.ensureNodeStopped() } func (s *APITestSuite) TestCellsRemovedAfterSwitchAccount() { @@ -186,3 +195,50 @@ func (s *APITestSuite) TestLogoutRemovesCells() { _, err = s.api.JailManager().Cell(testChatID) require.Error(err, "Expected that cells was removed") } + +func (s *APITestSuite) TestNodeStartCrash() { + // let's listen for node.crashed signal + signalReceived := make(chan struct{}) + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + var envelope signal.Envelope + err := json.Unmarshal([]byte(jsonEvent), &envelope) + s.NoError(err) + + if envelope.Type == signal.EventNodeCrashed { + close(signalReceived) + } + }) + defer signal.ResetDefaultNodeNotificationHandler() + + nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) + s.NoError(err) + + // start node outside the manager (on the same port), so that manager node.Start() method fails + outsideNode, err := node.MakeNode(nodeConfig) + s.NoError(err) + err = outsideNode.Start() + s.NoError(err) + + // now try starting using node manager, it should fail (error is irrelevant as it is implementation detail) + s.Error(<-s.api.StartNodeAsync(nodeConfig)) + + select { + case <-time.After(500 * time.Millisecond): + s.FailNow("timed out waiting for signal") + case <-signalReceived: + } + + // stop outside node, and re-try + s.NoError(outsideNode.Stop()) + signalReceived = make(chan struct{}) + s.NoError(<-s.api.StartNodeAsync(nodeConfig)) + + select { + case <-time.After(500 * time.Millisecond): + case <-signalReceived: + s.FailNow("signal should not be received") + } + + // cleanup + s.NoError(s.api.StopNode()) +} diff --git a/t/e2e/api/backend_test.go b/t/e2e/api/backend_test.go index 0decdb031..e75ac432e 100644 --- a/t/e2e/api/backend_test.go +++ b/t/e2e/api/backend_test.go @@ -45,13 +45,13 @@ func (s *APIBackendTestSuite) TestRaceConditions() { var funcsToTest = []func(*params.NodeConfig){ func(config *params.NodeConfig) { log.Info("StartNode()") - _, err := s.Backend.StartNode(config) + err := s.Backend.StartNode(config) s.T().Logf("StartNode() for network: %d, error: %v", config.NetworkID, err) progress <- struct{}{} }, func(config *params.NodeConfig) { log.Info("StopNode()") - _, err := s.Backend.StopNode() + err := s.Backend.StopNode() s.T().Logf("StopNode() for network: %d, error: %v", config.NetworkID, err) progress <- struct{}{} }, @@ -63,7 +63,7 @@ func (s *APIBackendTestSuite) TestRaceConditions() { // }, func(config *params.NodeConfig) { log.Info("RestartNode()") - _, err := s.Backend.RestartNode() + err := s.Backend.RestartNode() s.T().Logf("RestartNode(), error: %v", err) progress <- struct{}{} }, @@ -174,16 +174,15 @@ func (s *APIBackendTestSuite) TestRaceConditions() { } for range progress { - cnt -= 1 + cnt-- if cnt <= 0 { break } } - time.Sleep(2 * time.Second) // so that we see some logs - nodeStopped, _ := s.Backend.StopNode() // just in case we have a node running - if nodeStopped != nil { - <-nodeStopped + time.Sleep(2 * time.Second) // so that we see some logs + if err := s.Backend.StopNode(); err != node.ErrNoRunningNode && err != nil { + s.NoError(err, "unexpected error") } } @@ -195,10 +194,7 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() { s.NoError(err) s.False(s.Backend.IsNodeRunning()) - nodeStarted, err := s.Backend.StartNode(nodeConfig) - s.NoError(err) - - <-nodeStarted // wait till node is started + s.NoError(s.Backend.StartNode(nodeConfig)) s.True(s.Backend.IsNodeRunning()) firstHash, err := e2e.FirstBlockHash(s.Backend.NodeManager()) @@ -206,19 +202,14 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() { s.Equal(GetHeadHash(), firstHash) // now stop node, and make sure that a new node, on different network can be started - nodeStopped, err := s.Backend.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.Backend.StopNode()) // start new node with completely different config nodeConfig, err = e2e.MakeTestNodeConfig(GetNetworkID()) s.NoError(err) s.False(s.Backend.IsNodeRunning()) - nodeStarted, err = s.Backend.StartNode(nodeConfig) - s.NoError(err) - - <-nodeStarted + s.NoError(s.Backend.StartNode(nodeConfig)) s.True(s.Backend.IsNodeRunning()) // make sure we are on another network indeed @@ -226,9 +217,7 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() { s.NoError(err) s.Equal(GetHeadHash(), firstHash) - nodeStopped, err = s.Backend.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.Backend.StopNode()) } // FIXME(tiabc): There's also a test with the same name in geth/node/manager_test.go @@ -245,9 +234,8 @@ func (s *APIBackendTestSuite) TestResetChainData() { EnsureNodeSync(s.Backend.NodeManager()) s.True(s.Backend.IsNodeRunning()) - nodeReady, err := s.Backend.ResetChainData() - require.NoError(err) - <-nodeReady + require.NoError(s.Backend.ResetChainData()) + s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running // make sure we can read the first byte, and it is valid (for Rinkeby) @@ -267,10 +255,7 @@ func (s *APIBackendTestSuite) TestRestartNode() { s.NoError(err) s.False(s.Backend.IsNodeRunning()) - nodeStarted, err := s.Backend.StartNode(nodeConfig) - s.NoError(err) - - <-nodeStarted // wait till node is started + s.NoError(s.Backend.StartNode(nodeConfig)) s.True(s.Backend.IsNodeRunning()) firstHash, err := e2e.FirstBlockHash(s.Backend.NodeManager()) @@ -278,9 +263,7 @@ func (s *APIBackendTestSuite) TestRestartNode() { s.Equal(GetHeadHash(), firstHash) s.True(s.Backend.IsNodeRunning()) - nodeRestarted, err := s.Backend.RestartNode() - require.NoError(err) - <-nodeRestarted + require.NoError(s.Backend.RestartNode()) s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running // make sure we can read the first byte, and it is valid (for Rinkeby) diff --git a/t/e2e/node/manager_test.go b/t/e2e/node/manager_test.go index 66ab74ea3..9ef6c8d1d 100644 --- a/t/e2e/node/manager_test.go +++ b/t/e2e/node/manager_test.go @@ -1,7 +1,6 @@ package node_test import ( - "encoding/json" "testing" "time" @@ -13,7 +12,7 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/params" - "github.com/status-im/status-go/geth/signal" + e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -37,20 +36,6 @@ func (s *ManagerTestSuite) TestReferencesWithoutStartedNode() { initFn func() (interface{}, error) expectedErr error }{ - { - "non-null manager, no running node, RestartNode()", - func() (interface{}, error) { - return s.NodeManager.RestartNode() - }, - node.ErrNoRunningNode, - }, - { - "non-null manager, no running node, ResetChainData()", - func() (interface{}, error) { - return s.NodeManager.ResetChainData() - }, - node.ErrNoRunningNode, - }, { "non-null manager, no running node, PopulateStaticPeers()", func() (interface{}, error) { @@ -200,42 +185,30 @@ func (s *ManagerTestSuite) TestNodeStartStop() { // try stopping non-started node s.False(s.NodeManager.IsNodeRunning()) - time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - _, err = s.NodeManager.StopNode() - s.Equal(err, node.ErrNoRunningNode) + s.Equal(node.ErrNoRunningNode, s.NodeManager.StopNode()) // start node s.False(s.NodeManager.IsNodeRunning()) - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) + s.NoError(s.NodeManager.StartNode(nodeConfig)) // wait till node is started - <-nodeStarted s.True(s.NodeManager.IsNodeRunning()) // try starting another node (w/o stopping the previously started node) - _, err = s.NodeManager.StartNode(nodeConfig) - s.Equal(err, node.ErrNodeExists) + s.Equal(node.ErrNodeExists, s.NodeManager.StartNode(nodeConfig)) // now stop node time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - nodeStopped, err := s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.NodeManager.StopNode()) s.False(s.NodeManager.IsNodeRunning()) // start new node with exactly the same config - nodeStarted, err = s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - // wait till node is started - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) s.True(s.NodeManager.IsNodeRunning()) // finally stop the node time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - nodeStopped, err = s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.NodeManager.StopNode()) } func (s *ManagerTestSuite) TestNetworkSwitching() { @@ -243,10 +216,8 @@ func (s *ManagerTestSuite) TestNetworkSwitching() { nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) s.NoError(err) s.False(s.NodeManager.IsNodeRunning()) - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) + s.NoError(s.NodeManager.StartNode(nodeConfig)) // wait till node is started - <-nodeStarted s.True(s.NodeManager.IsNodeRunning()) firstHash, err := e2e.FirstBlockHash(s.NodeManager) @@ -255,18 +226,13 @@ func (s *ManagerTestSuite) TestNetworkSwitching() { // now stop node, and make sure that a new node, on different network can be started time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - nodeStopped, err := s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.NodeManager.StopNode()) s.False(s.NodeManager.IsNodeRunning()) // start new node with completely different config nodeConfig, err = e2e.MakeTestNodeConfig(params.RinkebyNetworkID) s.NoError(err) - nodeStarted, err = s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - // wait till node is started - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) s.True(s.NodeManager.IsNodeRunning()) // make sure we are on another network indeed @@ -275,9 +241,7 @@ func (s *ManagerTestSuite) TestNetworkSwitching() { s.Equal(GetHeadHashFromNetworkID(params.RinkebyNetworkID), firstHash) time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - nodeStopped, err = s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.NodeManager.StopNode()) } func (s *ManagerTestSuite) TestStartNodeWithUpstreamEnabled() { @@ -294,54 +258,11 @@ func (s *ManagerTestSuite) TestStartNodeWithUpstreamEnabled() { nodeConfig.UpstreamConfig.Enabled = true nodeConfig.UpstreamConfig.URL = networkURL - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) s.True(s.NodeManager.IsNodeRunning()) time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - nodeStopped, err := s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped -} - -// TODO(adam): fix this test to not use a different directory for blockchain data -func (s *ManagerTestSuite) TestResetChainData() { - s.T().Skip() - - s.StartTestNode() - defer s.StopTestNode() - - EnsureNodeSync(s.NodeManager) - - // reset chain data - nodeReady, err := s.NodeManager.ResetChainData() - s.NoError(err) - // new node, with previous config should be running - <-nodeReady - s.True(s.NodeManager.IsNodeRunning()) - - // make sure we can read the first byte, and it is valid (for Rinkeby) - firstHash, err := e2e.FirstBlockHash(s.NodeManager) - s.NoError(err) - s.Equal(GetHeadHash(), firstHash) -} - -func (s *ManagerTestSuite) TestRestartNode() { - s.StartTestNode() - defer s.StopTestNode() - - s.True(s.NodeManager.IsNodeRunning()) - nodeReady, err := s.NodeManager.RestartNode() - s.NoError(err) - // new node, with previous config should be running - <-nodeReady - s.True(s.NodeManager.IsNodeRunning()) - - // make sure we can read the first byte, and it is valid (for Rinkeby) - firstHash, err := e2e.FirstBlockHash(s.NodeManager) - s.NoError(err) - s.Equal(GetHeadHash(), firstHash) + s.NoError(s.NodeManager.StopNode()) } // TODO(adam): race conditions should be tested with -race flag and unit tests, if possible. @@ -391,10 +312,10 @@ func (s *ManagerTestSuite) TestRestartNode() { // // TODO(adam): quarantined until it uses a different datadir // // as otherwise it wipes out cached blockchain data. // // func(config *params.NodeConfig) { -// // log.Info("ResetChainData()") -// // _, err := s.NodeManager.ResetChainData() -// // s.T().Logf("ResetChainData(), error: %v", err) -// // progress <- struct{}{} +// // log.Info("ResetChainData()") +// // _, err := s.NodeManager.ResetChainData() +// // s.T().Logf("ResetChainData(), error: %v", err) +// // progress <- struct{}{} // // }, // func(config *params.NodeConfig) { // log.Info("RestartNode()") @@ -468,58 +389,3 @@ func (s *ManagerTestSuite) TestRestartNode() { // <-nodeStopped // } //} - -func (s *ManagerTestSuite) TestNodeStartCrash() { - // let's listen for node.crashed signal - signalReceived := make(chan struct{}) - signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { - var envelope signal.Envelope - err := json.Unmarshal([]byte(jsonEvent), &envelope) - s.NoError(err) - - if envelope.Type == signal.EventNodeCrashed { - close(signalReceived) - } - }) - - nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) - s.NoError(err) - - // start node outside the manager (on the same port), so that manager node.Start() method fails - outsideNode, err := node.MakeNode(nodeConfig) - s.NoError(err) - err = outsideNode.Start() - s.NoError(err) - - // now try starting using node manager - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) // no error is thrown, as node is started in separate routine - <-nodeStarted // no deadlock either, as manager should close the channel on error - s.False(s.NodeManager.IsNodeRunning()) - - select { - case <-time.After(5 * time.Second): - s.FailNow("timed out waiting for signal") - case <-signalReceived: - } - - // stop outside node, and re-try - err = outsideNode.Stop() - s.NoError(err) - signalReceived = make(chan struct{}) - nodeStarted, err = s.NodeManager.StartNode(nodeConfig) - s.NoError(err) // again, no error - <-nodeStarted // no deadlock, and no signal this time, manager should be able to start node - s.True(s.NodeManager.IsNodeRunning()) - - select { - case <-time.After(5 * time.Second): - case <-signalReceived: - s.FailNow("signal should not be received") - } - - // cleanup - time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163 - s.NodeManager.StopNode() //nolint: errcheck - signal.ResetDefaultNodeNotificationHandler() -} diff --git a/t/e2e/rpc/rpc_test.go b/t/e2e/rpc/rpc_test.go index 8e5c8cbb5..d8ca2b143 100644 --- a/t/e2e/rpc/rpc_test.go +++ b/t/e2e/rpc/rpc_test.go @@ -51,9 +51,7 @@ func (s *RPCTestSuite) TestCallRPC() { nodeConfig.UpstreamConfig.URL = networkURL } - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) rpcClient := s.NodeManager.RPCClient() s.NotNil(rpcClient) @@ -122,9 +120,7 @@ func (s *RPCTestSuite) TestCallRPC() { case <-done: } - stoppedNode, err := s.NodeManager.StopNode() - s.NoError(err) - <-stoppedNode + s.NoError(s.NodeManager.StopNode()) } } @@ -133,9 +129,7 @@ func (s *RPCTestSuite) TestCallRawResult() { nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) s.NoError(err) - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) client := s.NodeManager.RPCClient() s.NotNil(client) @@ -143,7 +137,7 @@ func (s *RPCTestSuite) TestCallRawResult() { jsonResult := client.CallRaw(`{"jsonrpc":"2.0","method":"shh_version","params":[],"id":67}`) s.Equal(`{"jsonrpc":"2.0","id":67,"result":"5.0"}`, jsonResult) - s.NodeManager.StopNode() //nolint: errcheck + s.NoError(s.NodeManager.StopNode()) } // TestCallRawResultGetTransactionReceipt checks if returned response @@ -153,9 +147,7 @@ func (s *RPCTestSuite) TestCallRawResultGetTransactionReceipt() { nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) s.NoError(err) - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) client := s.NodeManager.RPCClient() s.NotNil(client) @@ -163,7 +155,7 @@ func (s *RPCTestSuite) TestCallRawResultGetTransactionReceipt() { jsonResult := client.CallRaw(`{"jsonrpc":"2.0","method":"eth_getTransactionReceipt","params":["0x0ca0d8f2422f62bea77e24ed17db5711a77fa72064cccbb8e53c53b699cd3b34"],"id":5}`) s.Equal(`{"jsonrpc":"2.0","id":5,"result":null}`, jsonResult) - s.NodeManager.StopNode() //nolint: errcheck + s.NoError(s.NodeManager.StopNode()) } // TestCallContextResult checks if result passed to CallContext diff --git a/t/e2e/suites.go b/t/e2e/suites.go index 0529762b4..50dc4d678 100644 --- a/t/e2e/suites.go +++ b/t/e2e/suites.go @@ -6,6 +6,7 @@ import ( "github.com/status-im/status-go/geth/api" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" + "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" . "github.com/status-im/status-go/t/utils" //nolint: golint @@ -15,7 +16,7 @@ import ( // NodeManagerTestSuite defines a test suit with NodeManager. type NodeManagerTestSuite struct { suite.Suite - NodeManager common.NodeManager + NodeManager *node.NodeManager } func init() { @@ -47,10 +48,7 @@ func (s *NodeManagerTestSuite) StartTestNode(opts ...TestNodeOption) { s.NoError(importTestAccouns(nodeConfig.KeyStoreDir)) s.False(s.NodeManager.IsNodeRunning()) - nodeStarted, err := s.NodeManager.StartNode(nodeConfig) - s.NoError(err) - s.NotNil(nodeStarted) - <-nodeStarted + s.NoError(s.NodeManager.StartNode(nodeConfig)) s.True(s.NodeManager.IsNodeRunning()) } @@ -58,9 +56,7 @@ func (s *NodeManagerTestSuite) StartTestNode(opts ...TestNodeOption) { func (s *NodeManagerTestSuite) StopTestNode() { s.NotNil(s.NodeManager) s.True(s.NodeManager.IsNodeRunning()) - nodeStopped, err := s.NodeManager.StopNode() - s.NoError(err) - <-nodeStopped + s.NoError(s.NodeManager.StopNode()) s.False(s.NodeManager.IsNodeRunning()) } @@ -97,27 +93,21 @@ func (s *BackendTestSuite) StartTestBackend(opts ...TestNodeOption) { // start node s.False(s.Backend.IsNodeRunning()) - nodeStarted, err := s.Backend.StartNode(nodeConfig) - s.NoError(err) - <-nodeStarted + s.NoError(s.Backend.StartNode(nodeConfig)) s.True(s.Backend.IsNodeRunning()) } // StopTestBackend stops the node. func (s *BackendTestSuite) StopTestBackend() { s.True(s.Backend.IsNodeRunning()) - backendStopped, err := s.Backend.StopNode() - s.NoError(err) - <-backendStopped + s.NoError(s.Backend.StopNode()) s.False(s.Backend.IsNodeRunning()) } // RestartTestNode restarts a currently running node. func (s *BackendTestSuite) RestartTestNode() { s.True(s.Backend.IsNodeRunning()) - nodeRestarted, err := s.Backend.RestartNode() - s.NoError(err) - <-nodeRestarted + s.NoError(s.Backend.RestartNode()) s.True(s.Backend.IsNodeRunning()) } diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index 21ddcc395..664499c1f 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -312,16 +312,12 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun nodeConfig.DataDir = datadir s.Require().NoError(err) s.Require().False(backend.IsNodeRunning()) - nodeStarted, err := backend.StartNode(nodeConfig) - s.Require().NoError(err) - <-nodeStarted // wait till node is started + s.Require().NoError(backend.StartNode(nodeConfig)) s.Require().True(backend.IsNodeRunning()) return backend, func() { s.True(backend.IsNodeRunning()) - backendStopped, err := backend.StopNode() - s.NoError(err) - <-backendStopped + s.NoError(backend.StopNode()) s.False(backend.IsNodeRunning()) os.RemoveAll(datadir) } @@ -344,15 +340,11 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data") mailboxConfig.DataDir = datadir - mailboxNodeStarted, err := mailboxBackend.StartNode(mailboxConfig) - s.Require().NoError(err) - <-mailboxNodeStarted // wait till node is started + s.Require().NoError(mailboxBackend.StartNode(mailboxConfig)) s.Require().True(mailboxBackend.IsNodeRunning()) return mailboxBackend, func() { s.True(mailboxBackend.IsNodeRunning()) - backendStopped, err := mailboxBackend.StopNode() - s.NoError(err) - <-backendStopped + s.NoError(mailboxBackend.StopNode()) s.False(mailboxBackend.IsNodeRunning()) os.RemoveAll(datadir) } diff --git a/t/utils/utils.go b/t/utils/utils.go index 782e0764e..34e90f160 100644 --- a/t/utils/utils.go +++ b/t/utils/utils.go @@ -220,7 +220,7 @@ func GetAccount2PKFile() string { } // WaitClosed used to wait on a channel in tests -func WaitClosed(c chan struct{}, d time.Duration) error { +func WaitClosed(c <-chan struct{}, d time.Duration) error { timer := time.NewTimer(d) defer timer.Stop() select {