Remove async operations from node manager (#584)

The main goal of this change is to remove async operations from node manager.
Additionally all of the signals from node manager are moved to status backend.

All of the async operation now will have the following behaviour:
- If node in the correct state exit immediatly without error
- If node not in the correct state exit immediatly with error
- In all other cases spawn a goroutine with wanted operation
- All the progress regarding that operation will be reported
  by using signals
- Signals should be handled in once place, which is StatusBackend

There are 2 potentially breaking changes:
- Empty event field will be ommited when Envelope is sent to a client
- All errors will be delivered to a client as an Envelope, previously
  some errors (NodeExists, NoRunningNode) were delivered synchronously

Signed-off-by: Dmitry Shulyak <yashulyak@gmail.com>
This commit is contained in:
Dmitry Shulyak 2018-02-09 15:37:56 +02:00 committed by Igor Mandrigin
parent 8c3aa9a619
commit 2d964bfe9f
19 changed files with 329 additions and 739 deletions

View File

@ -135,8 +135,7 @@ func (cs *commandSet) StopNode() error {
// ResetChainData removes chain data from data directory.
func (cs *commandSet) ResetChainData() error {
_, err := cs.statusAPI.ResetChainDataAsync()
return err
return cs.statusAPI.ResetChainData()
}
// CallRPC calls status node via RPC.

View File

@ -10,7 +10,6 @@ import (
"os/signal"
"runtime"
"strings"
"time"
"github.com/status-im/status-go/cmd/statusd/debug"
"github.com/status-im/status-go/geth/api"
@ -86,7 +85,7 @@ func main() {
}
backend := api.NewStatusBackend()
started, err := backend.StartNode(config)
err = backend.StartNode(config)
if err != nil {
log.Fatalf("Node start failed: %v", err)
return
@ -94,10 +93,6 @@ func main() {
// handle interrupt signals
interruptCh := haltOnInterruptSignal(backend.NodeManager())
// wait till node is started
<-started
// Check if debugging CLI connection shall be enabled.
if *cliEnabled {
err := startDebug(backend)
@ -282,30 +277,17 @@ Options:
// if the node can not be stopped.
func haltOnInterruptSignal(nodeManager common.NodeManager) <-chan struct{} {
interruptCh := make(chan struct{})
go func() {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
defer signal.Stop(signalCh)
<-signalCh
close(interruptCh)
log.Println("Got interrupt, shutting down...")
nodeStopped, err := nodeManager.StopNode()
if err != nil {
log.Printf("Failed to stop node: %v", err)
os.Exit(1)
}
select {
case <-nodeStopped:
case <-time.After(time.Second * 5):
log.Printf("Stopping node timed out")
if err := nodeManager.StopNode(); err != nil {
log.Printf("Failed to stop node: %v", err.Error())
os.Exit(1)
}
}()
return interruptCh
}

View File

@ -46,12 +46,9 @@ func syncAndStopNode(interruptCh <-chan struct{}, nodeManager common.NodeManager
return -1
}
done, err := nodeManager.StopNode()
if err != nil {
if err := nodeManager.StopNode(); err != nil {
log.Printf("syncAndStopNode: failed to stop the node: %v", err)
return 1
}
<-done
return
}

View File

@ -52,65 +52,45 @@ func (api *StatusAPI) TxQueueManager() *transactions.Manager {
// StartNode start Status node, fails if node is already started
func (api *StatusAPI) StartNode(config *params.NodeConfig) error {
nodeStarted, err := api.b.StartNode(config)
if err != nil {
return err
}
<-nodeStarted
return nil
return api.b.StartNode(config)
}
// StartNodeAsync start Status node, fails if node is already started
// Returns immediately w/o waiting for node to start (see node.ready)
func (api *StatusAPI) StartNodeAsync(config *params.NodeConfig) (<-chan struct{}, error) {
return api.b.StartNode(config)
func (api *StatusAPI) StartNodeAsync(config *params.NodeConfig) <-chan error {
return runAsync(func() error { return api.StartNode(config) })
}
// StopNode stop Status node. Stopped node cannot be resumed.
func (api *StatusAPI) StopNode() error {
nodeStopped, err := api.b.StopNode()
if err != nil {
return err
}
<-nodeStopped
return nil
return api.b.StopNode()
}
// StopNodeAsync stop Status node. Stopped node cannot be resumed.
// Returns immediately, w/o waiting for node to stop (see node.stopped)
func (api *StatusAPI) StopNodeAsync() (<-chan struct{}, error) {
return api.b.StopNode()
func (api *StatusAPI) StopNodeAsync() <-chan error {
return runAsync(api.StopNode)
}
// RestartNode restart running Status node, fails if node is not running
func (api *StatusAPI) RestartNode() error {
nodeStarted, err := api.b.RestartNode()
if err != nil {
return err
}
<-nodeStarted // do not return up until backend is ready
return nil
return api.b.RestartNode()
}
// RestartNodeAsync restart running Status node, in async manner
func (api *StatusAPI) RestartNodeAsync() (<-chan struct{}, error) {
return api.b.RestartNode()
func (api *StatusAPI) RestartNodeAsync() <-chan error {
return runAsync(api.RestartNode)
}
// ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
func (api *StatusAPI) ResetChainData() error {
nodeStarted, err := api.b.ResetChainData()
if err != nil {
return err
}
<-nodeStarted // do not return up until backend is ready
return nil
return api.b.ResetChainData()
}
// ResetChainDataAsync remove chain data from data directory, in async manner
func (api *StatusAPI) ResetChainDataAsync() (<-chan struct{}, error) {
return api.b.ResetChainData()
func (api *StatusAPI) ResetChainDataAsync() <-chan error {
return runAsync(api.ResetChainData)
}
// CallRPC executes RPC request on node's in-proc RPC server

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"fmt"
"sync"
gethcommon "github.com/ethereum/go-ethereum/common"
@ -23,9 +24,8 @@ const (
// StatusBackend implements Status.im service
type StatusBackend struct {
sync.Mutex
nodeReady chan struct{} // channel to wait for when node is fully ready
nodeManager common.NodeManager
mu sync.Mutex
nodeManager *node.NodeManager
accountManager common.AccountManager
txQueueManager *transactions.Manager
jailManager common.JailManager
@ -77,120 +77,99 @@ func (m *StatusBackend) IsNodeRunning() bool {
}
// StartNode start Status node, fails if node is already started
func (m *StatusBackend) StartNode(config *params.NodeConfig) (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if m.nodeReady != nil {
return nil, node.ErrNodeExists
}
nodeStarted, err := m.nodeManager.StartNode(config)
if err != nil {
return nil, err
}
m.nodeReady = make(chan struct{}, 1)
go m.onNodeStart(nodeStarted, m.nodeReady) // waits on nodeStarted, writes to backendReady
return m.nodeReady, err
func (m *StatusBackend) StartNode(config *params.NodeConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.startNode(config)
}
// onNodeStart does everything required to prepare backend
func (m *StatusBackend) onNodeStart(nodeStarted <-chan struct{}, backendReady chan struct{}) {
<-nodeStarted
func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("node crashed on start: %v", err)
}
}()
err = m.nodeManager.StartNode(config)
if err != nil {
switch err.(type) {
case node.RPCClientError:
err = node.ErrRPCClient
case node.EthNodeError:
err = fmt.Errorf("%v: %v", node.ErrNodeStartFailure, err)
}
signal.Send(signal.Envelope{
Type: signal.EventNodeCrashed,
Event: signal.NodeCrashEvent{
Error: err,
},
})
return err
}
signal.Send(signal.Envelope{Type: signal.EventNodeStarted})
// tx queue manager should be started after node is started, it depends
// on rpc client being created
m.txQueueManager.Start()
if err := m.registerHandlers(); err != nil {
log.Error("Handler registration failed", "err", err)
}
if err := m.accountManager.ReSelectAccount(); err != nil {
log.Error("Reselect account failed", "err", err)
}
log.Info("Account reselected")
close(backendReady)
signal.Send(signal.Envelope{
Type: signal.EventNodeReady,
Event: struct{}{},
})
signal.Send(signal.Envelope{Type: signal.EventNodeReady})
return nil
}
// StopNode stop Status node. Stopped node cannot be resumed.
func (m *StatusBackend) StopNode() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
func (m *StatusBackend) StopNode() error {
m.mu.Lock()
defer m.mu.Unlock()
return m.stopNode()
}
if m.nodeReady == nil {
return nil, node.ErrNoRunningNode
func (m *StatusBackend) stopNode() error {
if !m.IsNodeRunning() {
return node.ErrNoRunningNode
}
<-m.nodeReady
m.txQueueManager.Stop()
m.jailManager.Stop()
nodeStopped, err := m.nodeManager.StopNode()
if err != nil {
return nil, err
}
backendStopped := make(chan struct{}, 1)
go func() {
<-nodeStopped
m.Lock()
m.nodeReady = nil
m.Unlock()
close(backendStopped)
}()
return backendStopped, nil
return m.nodeManager.StopNode()
}
// RestartNode restart running Status node, fails if node is not running
func (m *StatusBackend) RestartNode() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if m.nodeReady == nil {
return nil, node.ErrNoRunningNode
func (m *StatusBackend) RestartNode() error {
if !m.IsNodeRunning() {
return node.ErrNoRunningNode
}
<-m.nodeReady
nodeRestarted, err := m.nodeManager.RestartNode()
config, err := m.nodeManager.NodeConfig()
if err != nil {
return nil, err
return err
}
m.nodeReady = make(chan struct{}, 1)
go m.onNodeStart(nodeRestarted, m.nodeReady) // waits on nodeRestarted, writes to backendReady
return m.nodeReady, err
newcfg := *config
if err := m.stopNode(); err != nil {
return err
}
return m.startNode(&newcfg)
}
// ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
func (m *StatusBackend) ResetChainData() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if m.nodeReady == nil {
return nil, node.ErrNoRunningNode
}
<-m.nodeReady
nodeReset, err := m.nodeManager.ResetChainData()
func (m *StatusBackend) ResetChainData() error {
m.mu.Lock()
defer m.mu.Unlock()
config, err := m.nodeManager.NodeConfig()
if err != nil {
return nil, err
return err
}
m.nodeReady = make(chan struct{}, 1)
go m.onNodeStart(nodeReset, m.nodeReady) // waits on nodeReset, writes to backendReady
return m.nodeReady, err
newcfg := *config
if err := m.stopNode(); err != nil {
return err
}
if err := m.ResetChainData(); err != nil {
return err
}
signal.Send(signal.Envelope{Type: signal.EventChainDataRemoved})
return m.startNode(&newcfg)
}
// CallRPC executes RPC request on node's in-proc RPC server
@ -241,7 +220,6 @@ func (m *StatusBackend) registerHandlers() error {
if rpcClient == nil {
return node.ErrRPCClient
}
rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
return nil

11
geth/api/utils.go Normal file
View File

@ -0,0 +1,11 @@
package api
func runAsync(f func() error) <-chan error {
resp := make(chan error, 1)
go func() {
err := f()
resp <- err
close(resp)
}()
return resp
}

View File

@ -47,21 +47,14 @@ func (k *SelectedExtKey) Hex() string {
// NodeManager defines expected methods for managing Status node
type NodeManager interface {
// StartNode start Status node, fails if node is already started
StartNode(config *params.NodeConfig) (<-chan struct{}, error)
// StopNode stop the running Status node.
// Stopped node cannot be resumed, one starts a new node instead.
StopNode() (<-chan struct{}, error)
StartNode(config *params.NodeConfig) error
// EnsureSync waits until blockchain is synchronized.
EnsureSync(ctx context.Context) error
// RestartNode restart running Status node, fails if node is not running
RestartNode() (<-chan struct{}, error)
// ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
ResetChainData() (<-chan struct{}, error)
// StopNode stop the running Status node.
// Stopped node cannot be resumed, one starts a new node instead.
StopNode() error
// IsNodeRunning confirm that node is running
IsNodeRunning() bool

View File

@ -43,11 +43,10 @@ func (m *MockNodeManager) EXPECT() *MockNodeManagerMockRecorder {
}
// StartNode mocks base method
func (m *MockNodeManager) StartNode(config *params.NodeConfig) (<-chan struct{}, error) {
func (m *MockNodeManager) StartNode(config *params.NodeConfig) error {
ret := m.ctrl.Call(m, "StartNode", config)
ret0, _ := ret[0].(<-chan struct{})
ret1, _ := ret[1].(error)
return ret0, ret1
ret0, _ := ret[0].(error)
return ret0
}
// StartNode indicates an expected call of StartNode
@ -55,19 +54,6 @@ func (mr *MockNodeManagerMockRecorder) StartNode(config interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartNode", reflect.TypeOf((*MockNodeManager)(nil).StartNode), config)
}
// StopNode mocks base method
func (m *MockNodeManager) StopNode() (<-chan struct{}, error) {
ret := m.ctrl.Call(m, "StopNode")
ret0, _ := ret[0].(<-chan struct{})
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StopNode indicates an expected call of StopNode
func (mr *MockNodeManagerMockRecorder) StopNode() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopNode", reflect.TypeOf((*MockNodeManager)(nil).StopNode))
}
// EnsureSync mocks base method
func (m *MockNodeManager) EnsureSync(ctx context.Context) error {
ret := m.ctrl.Call(m, "EnsureSync", ctx)
@ -80,30 +66,16 @@ func (mr *MockNodeManagerMockRecorder) EnsureSync(ctx interface{}) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureSync", reflect.TypeOf((*MockNodeManager)(nil).EnsureSync), ctx)
}
// RestartNode mocks base method
func (m *MockNodeManager) RestartNode() (<-chan struct{}, error) {
ret := m.ctrl.Call(m, "RestartNode")
ret0, _ := ret[0].(<-chan struct{})
ret1, _ := ret[1].(error)
return ret0, ret1
// StopNode mocks base method
func (m *MockNodeManager) StopNode() error {
ret := m.ctrl.Call(m, "StopNode")
ret0, _ := ret[0].(error)
return ret0
}
// RestartNode indicates an expected call of RestartNode
func (mr *MockNodeManagerMockRecorder) RestartNode() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RestartNode", reflect.TypeOf((*MockNodeManager)(nil).RestartNode))
}
// ResetChainData mocks base method
func (m *MockNodeManager) ResetChainData() (<-chan struct{}, error) {
ret := m.ctrl.Call(m, "ResetChainData")
ret0, _ := ret[0].(<-chan struct{})
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ResetChainData indicates an expected call of ResetChainData
func (mr *MockNodeManagerMockRecorder) ResetChainData() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetChainData", reflect.TypeOf((*MockNodeManager)(nil).ResetChainData))
// StopNode indicates an expected call of StopNode
func (mr *MockNodeManagerMockRecorder) StopNode() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopNode", reflect.TypeOf((*MockNodeManager)(nil).StopNode))
}
// IsNodeRunning mocks base method

View File

@ -19,7 +19,6 @@ import (
"github.com/status-im/status-go/geth/mailservice"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/geth/signal"
)
// errors
@ -34,15 +33,20 @@ var (
ErrRPCClient = errors.New("failed to init RPC client")
)
// RPCClientError reported when rpc client is initialized.
type RPCClientError error
// EthNodeError is reported when node crashed on start up.
type EthNodeError error
// NodeManager manages Status node (which abstracts contained geth node)
// nolint: golint
// should be fixed at https://github.com/status-im/status-go/issues/200
type NodeManager struct {
sync.RWMutex
config *params.NodeConfig // Status node configuration
node *node.Node // reference to Geth P2P stack/node
nodeStarted chan struct{} // channel to wait for start up notifications
nodeStopped chan struct{} // channel to wait for termination notifications
mu sync.RWMutex
config *params.NodeConfig // Status node configuration
node *node.Node // reference to Geth P2P stack/node
whisperService *whisper.Whisper // reference to Whisper service
lesService *les.LightEthereum // reference to LES service
rpcClient *rpc.Client // reference to RPC client
@ -54,201 +58,131 @@ func NewNodeManager() *NodeManager {
}
// StartNode start Status node, fails if node is already started
func (m *NodeManager) StartNode(config *params.NodeConfig) (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
func (m *NodeManager) StartNode(config *params.NodeConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.startNode(config)
}
// startNode start Status node, fails if node is already started
func (m *NodeManager) startNode(config *params.NodeConfig) (<-chan struct{}, error) {
if m.node != nil || m.nodeStarted != nil {
return nil, ErrNodeExists
func (m *NodeManager) startNode(config *params.NodeConfig) error {
if err := m.isNodeAvailable(); err == nil {
return ErrNodeExists
}
m.initLog(config)
ethNode, err := MakeNode(config)
if err != nil {
return nil, err
return err
}
m.node = ethNode
m.config = config
// activate MailService required for Offline Inboxing
if err := ethNode.Register(func(_ *node.ServiceContext) (node.Service, error) {
return mailservice.New(m), nil
}); err != nil {
return nil, err
return err
}
m.nodeStarted = make(chan struct{}, 1)
// start underlying node
if err := ethNode.Start(); err != nil {
return EthNodeError(err)
}
// init RPC client for this node
localRPCClient, err := m.node.Attach()
if err == nil {
m.rpcClient, err = rpc.NewClient(localRPCClient, m.config.UpstreamConfig)
}
if err != nil {
log.Error("Failed to create an RPC client", "error", err)
return RPCClientError(err)
}
// populate static peers exits when node stopped
go func() {
defer HaltOnPanic()
// start underlying node
if startErr := ethNode.Start(); startErr != nil {
close(m.nodeStarted)
m.Lock()
m.nodeStarted = nil
m.Unlock()
signal.Send(signal.Envelope{
Type: signal.EventNodeCrashed,
Event: signal.NodeCrashEvent{
Error: fmt.Errorf("%v: %v", ErrNodeStartFailure, startErr),
},
})
return
if err := m.PopulateStaticPeers(); err != nil {
log.Error("Static peers population", "error", err)
}
m.Lock()
m.node = ethNode
m.nodeStopped = make(chan struct{}, 1)
m.config = config
// init RPC client for this node
localRPCClient, errRPC := m.node.Attach()
if errRPC == nil {
m.rpcClient, errRPC = rpc.NewClient(localRPCClient, m.config.UpstreamConfig)
}
if errRPC != nil {
log.Error("Failed to create an RPC client", "error", errRPC)
m.Unlock()
signal.Send(signal.Envelope{
Type: signal.EventNodeCrashed,
Event: signal.NodeCrashEvent{
Error: ErrRPCClient,
},
})
return
}
m.Unlock()
// underlying node is started, every method can use it, we use it immediately
go func() {
if err := m.PopulateStaticPeers(); err != nil {
log.Error("Static peers population", "error", err)
}
}()
// notify all subscribers that Status node is started
close(m.nodeStarted)
signal.Send(signal.Envelope{
Type: signal.EventNodeStarted,
Event: struct{}{},
})
// wait up until underlying node is stopped
m.node.Wait()
// notify m.Stop() that node has been stopped
close(m.nodeStopped)
log.Info("Node is stopped")
}()
return m.nodeStarted, nil
return nil
}
// StopNode stop Status node. Stopped node cannot be resumed.
func (m *NodeManager) StopNode() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
if m.nodeStopped == nil {
return nil, ErrNoRunningNode
}
<-m.nodeStarted // make sure you operate on fully started node
func (m *NodeManager) StopNode() error {
m.mu.Lock()
defer m.mu.Unlock()
return m.stopNode()
}
// stopNode stop Status node. Stopped node cannot be resumed.
func (m *NodeManager) stopNode() (<-chan struct{}, error) {
// now attempt to stop
if err := m.node.Stop(); err != nil {
return nil, err
func (m *NodeManager) stopNode() error {
if err := m.isNodeAvailable(); err != nil {
return err
}
if err := m.node.Stop(); err != nil {
return err
}
m.node = nil
m.config = nil
m.lesService = nil
m.whisperService = nil
m.rpcClient = nil
return nil
}
nodeStopped := make(chan struct{}, 1)
go func() {
<-m.nodeStopped // Status node is stopped (code after Wait() is executed)
log.Info("Ready to reset node")
// reset node params
m.Lock()
m.config = nil
m.lesService = nil
m.whisperService = nil
m.rpcClient = nil
m.nodeStarted = nil
m.node = nil
m.Unlock()
close(nodeStopped) // Status node is stopped, and we can create another
log.Info("Node manager resets node params")
// notify application that it can send more requests now
signal.Send(signal.Envelope{
Type: signal.EventNodeStopped,
Event: struct{}{},
})
log.Info("Node manager notifed app, that node has stopped")
}()
return nodeStopped, nil
// ResetChainData removes chain data if node is not running.
func (m *NodeManager) ResetChainData() error {
if !m.IsNodeRunning() {
return ErrNoRunningNode
}
m.mu.Lock()
defer m.mu.Unlock()
chainDataDir := filepath.Join(m.config.DataDir, m.config.Name, "lightchaindata")
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
// is it really an error, if we want to remove it as next step?
return err
}
err := os.RemoveAll(chainDataDir)
if err == nil {
log.Info("Chain data has been removed", "dir", chainDataDir)
}
return err
}
// IsNodeRunning confirm that node is running
func (m *NodeManager) IsNodeRunning() bool {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return false
}
<-m.nodeStarted
return true
}
// Node returns underlying Status node
func (m *NodeManager) Node() (*node.Node, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
return m.node, nil
}
// PopulateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (m *NodeManager) PopulateStaticPeers() error {
m.RLock()
defer m.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return err
}
<-m.nodeStarted
m.mu.RLock()
defer m.mu.RUnlock()
return m.populateStaticPeers()
}
// populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (m *NodeManager) populateStaticPeers() error {
if err := m.isNodeAvailable(); err != nil {
return err
}
if !m.config.BootClusterConfig.Enabled {
log.Info("Boot cluster is disabled")
return nil
@ -268,211 +202,107 @@ func (m *NodeManager) populateStaticPeers() error {
// AddPeer adds new static peer node
func (m *NodeManager) AddPeer(url string) error {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return err
}
<-m.nodeStarted
return m.addPeer(url)
}
// addPeer adds new static peer node
func (m *NodeManager) addPeer(url string) error {
server := m.node.Server()
if server == nil {
return ErrNoRunningNode
}
// Try to add the url as a static peer and return
parsedNode, err := discover.ParseNode(url)
if err != nil {
return err
}
server.AddPeer(parsedNode)
m.node.Server().AddPeer(parsedNode)
return nil
}
// PeerCount returns the number of connected peers.
func (m *NodeManager) PeerCount() int {
if m.node == nil || m.node.Server() == nil {
if !m.IsNodeRunning() {
return 0
}
return m.node.Server().PeerCount()
}
// ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
func (m *NodeManager) ResetChainData() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
return m.resetChainData()
}
// resetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
func (m *NodeManager) resetChainData() (<-chan struct{}, error) {
prevConfig := *m.config
nodeStopped, err := m.stopNode()
if err != nil {
return nil, err
}
m.Unlock()
<-nodeStopped
m.Lock()
chainDataDir := filepath.Join(prevConfig.DataDir, prevConfig.Name, "lightchaindata")
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
return nil, err
}
if err := os.RemoveAll(chainDataDir); err != nil {
return nil, err
}
// send signal up to native app
signal.Send(signal.Envelope{
Type: signal.EventChainDataRemoved,
Event: struct{}{},
})
log.Info("Chain data has been removed", "dir", chainDataDir)
return m.startNode(&prevConfig)
}
// RestartNode restart running Status node, fails if node is not running
func (m *NodeManager) RestartNode() (<-chan struct{}, error) {
m.Lock()
defer m.Unlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
return m.restartNode()
}
// restartNode restart running Status node, fails if node is not running
func (m *NodeManager) restartNode() (<-chan struct{}, error) {
prevConfig := *m.config
nodeStopped, err := m.stopNode()
if err != nil {
return nil, err
}
m.Unlock()
<-nodeStopped
m.Lock()
return m.startNode(&prevConfig)
}
// NodeConfig exposes reference to running node's configuration
func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
return m.config, nil
}
// LightEthereumService exposes reference to LES service running on top of the node
func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
if m.lesService == nil {
if err := m.node.Service(&m.lesService); err != nil {
log.Warn("Cannot obtain LES service", "error", err)
return nil, ErrInvalidLightEthereumService
}
}
if m.lesService == nil {
return nil, ErrInvalidLightEthereumService
}
return m.lesService, nil
}
// WhisperService exposes reference to Whisper service running on top of the node
func (m *NodeManager) WhisperService() (*whisper.Whisper, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
if m.whisperService == nil {
if err := m.node.Service(&m.whisperService); err != nil {
log.Warn("Cannot obtain whisper service", "error", err)
return nil, ErrInvalidWhisperService
}
}
if m.whisperService == nil {
return nil, ErrInvalidWhisperService
}
return m.whisperService, nil
}
// AccountManager exposes reference to node's accounts manager
func (m *NodeManager) AccountManager() (*accounts.Manager, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
accountManager := m.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
}
return accountManager, nil
}
// AccountKeyStore exposes reference to accounts key store
func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
<-m.nodeStarted
accountManager := m.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
@ -493,9 +323,8 @@ func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
// RPCClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCClient() *rpc.Client {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
return m.rpcClient
}
@ -514,10 +343,9 @@ func (m *NodeManager) initLog(config *params.NodeConfig) {
// isNodeAvailable check if we have a node running and make sure is fully started
func (m *NodeManager) isNodeAvailable() error {
if m.nodeStarted == nil || m.node == nil {
if m.node == nil || m.node.Server() == nil {
return ErrNoRunningNode
}
return nil
}

View File

@ -1,25 +0,0 @@
package node
import (
"fmt"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/signal"
)
// HaltOnPanic recovers from panic, logs issue, sends upward notification, and exits
func HaltOnPanic() {
if r := recover(); r != nil {
err := fmt.Errorf("%v: %v", ErrNodeRunFailure, r)
// send signal up to native app
signal.Send(signal.Envelope{
Type: signal.EventNodeCrashed,
Event: signal.NodeCrashEvent{
Error: err,
},
})
common.Fatalf(ErrNodeRunFailure, r) // os.exit(1) is called internally
}
}

View File

@ -38,15 +38,15 @@ func StartNode(configJSON *C.char) *C.char {
return makeJSONResponse(err)
}
_, err = statusAPI.StartNodeAsync(config)
return makeJSONResponse(err)
statusAPI.StartNodeAsync(config)
return makeJSONResponse(nil)
}
//StopNode - stop status node
//export StopNode
func StopNode() *C.char {
_, err := statusAPI.StopNodeAsync()
return makeJSONResponse(err)
statusAPI.StopNodeAsync()
return makeJSONResponse(nil)
}
//ValidateNodeConfig validates config for status node
@ -95,8 +95,8 @@ func ValidateNodeConfig(configJSON *C.char) *C.char {
//ResetChainData remove chain data from data directory
//export ResetChainData
func ResetChainData() *C.char {
_, err := statusAPI.ResetChainDataAsync()
return makeJSONResponse(err)
statusAPI.ResetChainDataAsync()
return makeJSONResponse(nil)
}
//CallRPC calls status node via rpc

View File

@ -254,9 +254,7 @@ func (s *AccountsTestSuite) TestSelectedAccountOnRestart() {
nodeConfig, err := s.Backend.NodeManager().NodeConfig()
s.NoError(err)
preservedNodeConfig := *nodeConfig
nodeStoped, err := s.Backend.StopNode()
s.NoError(err)
<-nodeStoped
s.NoError(s.Backend.StopNode())
// make sure that account is still selected
selectedAccount, err = s.Backend.AccountManager().SelectedAccount()
@ -265,9 +263,7 @@ func (s *AccountsTestSuite) TestSelectedAccountOnRestart() {
s.Equal(selectedAccount.Address.Hex(), address2, "incorrect address selected")
// resume node
nodeStarted, err := s.Backend.StartNode(&preservedNodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.Backend.StartNode(&preservedNodeConfig))
// re-check selected account (account2 MUST be selected)
selectedAccount, err = s.Backend.AccountManager().SelectedAccount()

View File

@ -1,6 +1,7 @@
package api_test
import (
"encoding/json"
"io/ioutil"
"math/rand"
"os"
@ -10,7 +11,9 @@ import (
"github.com/status-im/status-go/geth/api"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
e2e "github.com/status-im/status-go/t/e2e"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
@ -29,6 +32,12 @@ type APITestSuite struct {
api *api.StatusAPI
}
func (s *APITestSuite) ensureNodeStopped() {
if err := s.api.StopNode(); err != node.ErrNoRunningNode && err != nil {
s.NoError(err, "unexpected error")
}
}
func (s *APITestSuite) SetupTest() {
s.api = api.NewStatusAPI()
s.NotNil(s.api)
@ -67,29 +76,29 @@ func (s *APITestSuite) TestRaceConditions() {
var funcsToTest = []func(*params.NodeConfig){
func(config *params.NodeConfig) {
log.Info("StartNodeAsync()")
_, err := s.api.StartNodeAsync(config)
s.T().Logf("StartNodeAsync() for network: %d, error: %v", config.NetworkID, err)
s.api.StartNodeAsync(config)
s.T().Logf("StartNodeAsync() for network: %d", config.NetworkID)
progress <- struct{}{}
},
func(config *params.NodeConfig) {
log.Info("StopNodeAsync()")
_, err := s.api.StopNodeAsync()
s.T().Logf("StopNodeAsync(), error: %v", err)
s.api.StopNodeAsync()
s.T().Logf("StopNodeAsync()")
progress <- struct{}{}
},
func(config *params.NodeConfig) {
log.Info("RestartNodeAsync()")
_, err := s.api.RestartNodeAsync()
s.T().Logf("RestartNodeAsync(), error: %v", err)
s.api.RestartNodeAsync()
s.T().Logf("RestartNodeAsync()")
progress <- struct{}{}
},
// TODO(adam): quarantined until it uses a different datadir
// as otherwise it wipes out cached blockchain data.
// func(config *params.NodeConfig) {
// log.Info("ResetChainDataAsync()")
// _, err := s.api.ResetChainDataAsync()
// s.T().Logf("ResetChainDataAsync(), error: %v", err)
// progress <- struct{}{}
// log.Info("ResetChainDataAsync()")
// _, err := s.api.ResetChainDataAsync()
// s.T().Logf("ResetChainDataAsync(), error: %v", err)
// progress <- struct{}{}
// },
}
@ -117,7 +126,7 @@ func (s *APITestSuite) TestRaceConditions() {
time.Sleep(2 * time.Second) // so that we see some logs
// just in case we have a node running
s.api.StopNode() //nolint: errcheck
s.ensureNodeStopped()
}
func (s *APITestSuite) TestCellsRemovedAfterSwitchAccount() {
@ -186,3 +195,50 @@ func (s *APITestSuite) TestLogoutRemovesCells() {
_, err = s.api.JailManager().Cell(testChatID)
require.Error(err, "Expected that cells was removed")
}
func (s *APITestSuite) TestNodeStartCrash() {
// let's listen for node.crashed signal
signalReceived := make(chan struct{})
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
if envelope.Type == signal.EventNodeCrashed {
close(signalReceived)
}
})
defer signal.ResetDefaultNodeNotificationHandler()
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
// start node outside the manager (on the same port), so that manager node.Start() method fails
outsideNode, err := node.MakeNode(nodeConfig)
s.NoError(err)
err = outsideNode.Start()
s.NoError(err)
// now try starting using node manager, it should fail (error is irrelevant as it is implementation detail)
s.Error(<-s.api.StartNodeAsync(nodeConfig))
select {
case <-time.After(500 * time.Millisecond):
s.FailNow("timed out waiting for signal")
case <-signalReceived:
}
// stop outside node, and re-try
s.NoError(outsideNode.Stop())
signalReceived = make(chan struct{})
s.NoError(<-s.api.StartNodeAsync(nodeConfig))
select {
case <-time.After(500 * time.Millisecond):
case <-signalReceived:
s.FailNow("signal should not be received")
}
// cleanup
s.NoError(s.api.StopNode())
}

View File

@ -45,13 +45,13 @@ func (s *APIBackendTestSuite) TestRaceConditions() {
var funcsToTest = []func(*params.NodeConfig){
func(config *params.NodeConfig) {
log.Info("StartNode()")
_, err := s.Backend.StartNode(config)
err := s.Backend.StartNode(config)
s.T().Logf("StartNode() for network: %d, error: %v", config.NetworkID, err)
progress <- struct{}{}
},
func(config *params.NodeConfig) {
log.Info("StopNode()")
_, err := s.Backend.StopNode()
err := s.Backend.StopNode()
s.T().Logf("StopNode() for network: %d, error: %v", config.NetworkID, err)
progress <- struct{}{}
},
@ -63,7 +63,7 @@ func (s *APIBackendTestSuite) TestRaceConditions() {
// },
func(config *params.NodeConfig) {
log.Info("RestartNode()")
_, err := s.Backend.RestartNode()
err := s.Backend.RestartNode()
s.T().Logf("RestartNode(), error: %v", err)
progress <- struct{}{}
},
@ -174,16 +174,15 @@ func (s *APIBackendTestSuite) TestRaceConditions() {
}
for range progress {
cnt -= 1
cnt--
if cnt <= 0 {
break
}
}
time.Sleep(2 * time.Second) // so that we see some logs
nodeStopped, _ := s.Backend.StopNode() // just in case we have a node running
if nodeStopped != nil {
<-nodeStopped
time.Sleep(2 * time.Second) // so that we see some logs
if err := s.Backend.StopNode(); err != node.ErrNoRunningNode && err != nil {
s.NoError(err, "unexpected error")
}
}
@ -195,10 +194,7 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() {
s.NoError(err)
s.False(s.Backend.IsNodeRunning())
nodeStarted, err := s.Backend.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted // wait till node is started
s.NoError(s.Backend.StartNode(nodeConfig))
s.True(s.Backend.IsNodeRunning())
firstHash, err := e2e.FirstBlockHash(s.Backend.NodeManager())
@ -206,19 +202,14 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() {
s.Equal(GetHeadHash(), firstHash)
// now stop node, and make sure that a new node, on different network can be started
nodeStopped, err := s.Backend.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.Backend.StopNode())
// start new node with completely different config
nodeConfig, err = e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
s.False(s.Backend.IsNodeRunning())
nodeStarted, err = s.Backend.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.Backend.StartNode(nodeConfig))
s.True(s.Backend.IsNodeRunning())
// make sure we are on another network indeed
@ -226,9 +217,7 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() {
s.NoError(err)
s.Equal(GetHeadHash(), firstHash)
nodeStopped, err = s.Backend.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.Backend.StopNode())
}
// FIXME(tiabc): There's also a test with the same name in geth/node/manager_test.go
@ -245,9 +234,8 @@ func (s *APIBackendTestSuite) TestResetChainData() {
EnsureNodeSync(s.Backend.NodeManager())
s.True(s.Backend.IsNodeRunning())
nodeReady, err := s.Backend.ResetChainData()
require.NoError(err)
<-nodeReady
require.NoError(s.Backend.ResetChainData())
s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running
// make sure we can read the first byte, and it is valid (for Rinkeby)
@ -267,10 +255,7 @@ func (s *APIBackendTestSuite) TestRestartNode() {
s.NoError(err)
s.False(s.Backend.IsNodeRunning())
nodeStarted, err := s.Backend.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted // wait till node is started
s.NoError(s.Backend.StartNode(nodeConfig))
s.True(s.Backend.IsNodeRunning())
firstHash, err := e2e.FirstBlockHash(s.Backend.NodeManager())
@ -278,9 +263,7 @@ func (s *APIBackendTestSuite) TestRestartNode() {
s.Equal(GetHeadHash(), firstHash)
s.True(s.Backend.IsNodeRunning())
nodeRestarted, err := s.Backend.RestartNode()
require.NoError(err)
<-nodeRestarted
require.NoError(s.Backend.RestartNode())
s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running
// make sure we can read the first byte, and it is valid (for Rinkeby)

View File

@ -1,7 +1,6 @@
package node_test
import (
"encoding/json"
"testing"
"time"
@ -13,7 +12,7 @@ import (
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
e2e "github.com/status-im/status-go/t/e2e"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
@ -37,20 +36,6 @@ func (s *ManagerTestSuite) TestReferencesWithoutStartedNode() {
initFn func() (interface{}, error)
expectedErr error
}{
{
"non-null manager, no running node, RestartNode()",
func() (interface{}, error) {
return s.NodeManager.RestartNode()
},
node.ErrNoRunningNode,
},
{
"non-null manager, no running node, ResetChainData()",
func() (interface{}, error) {
return s.NodeManager.ResetChainData()
},
node.ErrNoRunningNode,
},
{
"non-null manager, no running node, PopulateStaticPeers()",
func() (interface{}, error) {
@ -200,42 +185,30 @@ func (s *ManagerTestSuite) TestNodeStartStop() {
// try stopping non-started node
s.False(s.NodeManager.IsNodeRunning())
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
_, err = s.NodeManager.StopNode()
s.Equal(err, node.ErrNoRunningNode)
s.Equal(node.ErrNoRunningNode, s.NodeManager.StopNode())
// start node
s.False(s.NodeManager.IsNodeRunning())
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
s.NoError(s.NodeManager.StartNode(nodeConfig))
// wait till node is started
<-nodeStarted
s.True(s.NodeManager.IsNodeRunning())
// try starting another node (w/o stopping the previously started node)
_, err = s.NodeManager.StartNode(nodeConfig)
s.Equal(err, node.ErrNodeExists)
s.Equal(node.ErrNodeExists, s.NodeManager.StartNode(nodeConfig))
// now stop node
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
nodeStopped, err := s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.NodeManager.StopNode())
s.False(s.NodeManager.IsNodeRunning())
// start new node with exactly the same config
nodeStarted, err = s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
// wait till node is started
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
s.True(s.NodeManager.IsNodeRunning())
// finally stop the node
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
nodeStopped, err = s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.NodeManager.StopNode())
}
func (s *ManagerTestSuite) TestNetworkSwitching() {
@ -243,10 +216,8 @@ func (s *ManagerTestSuite) TestNetworkSwitching() {
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
s.False(s.NodeManager.IsNodeRunning())
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
s.NoError(s.NodeManager.StartNode(nodeConfig))
// wait till node is started
<-nodeStarted
s.True(s.NodeManager.IsNodeRunning())
firstHash, err := e2e.FirstBlockHash(s.NodeManager)
@ -255,18 +226,13 @@ func (s *ManagerTestSuite) TestNetworkSwitching() {
// now stop node, and make sure that a new node, on different network can be started
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
nodeStopped, err := s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.NodeManager.StopNode())
s.False(s.NodeManager.IsNodeRunning())
// start new node with completely different config
nodeConfig, err = e2e.MakeTestNodeConfig(params.RinkebyNetworkID)
s.NoError(err)
nodeStarted, err = s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
// wait till node is started
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
s.True(s.NodeManager.IsNodeRunning())
// make sure we are on another network indeed
@ -275,9 +241,7 @@ func (s *ManagerTestSuite) TestNetworkSwitching() {
s.Equal(GetHeadHashFromNetworkID(params.RinkebyNetworkID), firstHash)
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
nodeStopped, err = s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.NodeManager.StopNode())
}
func (s *ManagerTestSuite) TestStartNodeWithUpstreamEnabled() {
@ -294,54 +258,11 @@ func (s *ManagerTestSuite) TestStartNodeWithUpstreamEnabled() {
nodeConfig.UpstreamConfig.Enabled = true
nodeConfig.UpstreamConfig.URL = networkURL
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
s.True(s.NodeManager.IsNodeRunning())
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
nodeStopped, err := s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
}
// TODO(adam): fix this test to not use a different directory for blockchain data
func (s *ManagerTestSuite) TestResetChainData() {
s.T().Skip()
s.StartTestNode()
defer s.StopTestNode()
EnsureNodeSync(s.NodeManager)
// reset chain data
nodeReady, err := s.NodeManager.ResetChainData()
s.NoError(err)
// new node, with previous config should be running
<-nodeReady
s.True(s.NodeManager.IsNodeRunning())
// make sure we can read the first byte, and it is valid (for Rinkeby)
firstHash, err := e2e.FirstBlockHash(s.NodeManager)
s.NoError(err)
s.Equal(GetHeadHash(), firstHash)
}
func (s *ManagerTestSuite) TestRestartNode() {
s.StartTestNode()
defer s.StopTestNode()
s.True(s.NodeManager.IsNodeRunning())
nodeReady, err := s.NodeManager.RestartNode()
s.NoError(err)
// new node, with previous config should be running
<-nodeReady
s.True(s.NodeManager.IsNodeRunning())
// make sure we can read the first byte, and it is valid (for Rinkeby)
firstHash, err := e2e.FirstBlockHash(s.NodeManager)
s.NoError(err)
s.Equal(GetHeadHash(), firstHash)
s.NoError(s.NodeManager.StopNode())
}
// TODO(adam): race conditions should be tested with -race flag and unit tests, if possible.
@ -391,10 +312,10 @@ func (s *ManagerTestSuite) TestRestartNode() {
// // TODO(adam): quarantined until it uses a different datadir
// // as otherwise it wipes out cached blockchain data.
// // func(config *params.NodeConfig) {
// // log.Info("ResetChainData()")
// // _, err := s.NodeManager.ResetChainData()
// // s.T().Logf("ResetChainData(), error: %v", err)
// // progress <- struct{}{}
// // log.Info("ResetChainData()")
// // _, err := s.NodeManager.ResetChainData()
// // s.T().Logf("ResetChainData(), error: %v", err)
// // progress <- struct{}{}
// // },
// func(config *params.NodeConfig) {
// log.Info("RestartNode()")
@ -468,58 +389,3 @@ func (s *ManagerTestSuite) TestRestartNode() {
// <-nodeStopped
// }
//}
func (s *ManagerTestSuite) TestNodeStartCrash() {
// let's listen for node.crashed signal
signalReceived := make(chan struct{})
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
if envelope.Type == signal.EventNodeCrashed {
close(signalReceived)
}
})
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
// start node outside the manager (on the same port), so that manager node.Start() method fails
outsideNode, err := node.MakeNode(nodeConfig)
s.NoError(err)
err = outsideNode.Start()
s.NoError(err)
// now try starting using node manager
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err) // no error is thrown, as node is started in separate routine
<-nodeStarted // no deadlock either, as manager should close the channel on error
s.False(s.NodeManager.IsNodeRunning())
select {
case <-time.After(5 * time.Second):
s.FailNow("timed out waiting for signal")
case <-signalReceived:
}
// stop outside node, and re-try
err = outsideNode.Stop()
s.NoError(err)
signalReceived = make(chan struct{})
nodeStarted, err = s.NodeManager.StartNode(nodeConfig)
s.NoError(err) // again, no error
<-nodeStarted // no deadlock, and no signal this time, manager should be able to start node
s.True(s.NodeManager.IsNodeRunning())
select {
case <-time.After(5 * time.Second):
case <-signalReceived:
s.FailNow("signal should not be received")
}
// cleanup
time.Sleep(100 * time.Millisecond) //https://github.com/status-im/status-go/issues/429#issuecomment-339663163
s.NodeManager.StopNode() //nolint: errcheck
signal.ResetDefaultNodeNotificationHandler()
}

View File

@ -51,9 +51,7 @@ func (s *RPCTestSuite) TestCallRPC() {
nodeConfig.UpstreamConfig.URL = networkURL
}
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
rpcClient := s.NodeManager.RPCClient()
s.NotNil(rpcClient)
@ -122,9 +120,7 @@ func (s *RPCTestSuite) TestCallRPC() {
case <-done:
}
stoppedNode, err := s.NodeManager.StopNode()
s.NoError(err)
<-stoppedNode
s.NoError(s.NodeManager.StopNode())
}
}
@ -133,9 +129,7 @@ func (s *RPCTestSuite) TestCallRawResult() {
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
client := s.NodeManager.RPCClient()
s.NotNil(client)
@ -143,7 +137,7 @@ func (s *RPCTestSuite) TestCallRawResult() {
jsonResult := client.CallRaw(`{"jsonrpc":"2.0","method":"shh_version","params":[],"id":67}`)
s.Equal(`{"jsonrpc":"2.0","id":67,"result":"5.0"}`, jsonResult)
s.NodeManager.StopNode() //nolint: errcheck
s.NoError(s.NodeManager.StopNode())
}
// TestCallRawResultGetTransactionReceipt checks if returned response
@ -153,9 +147,7 @@ func (s *RPCTestSuite) TestCallRawResultGetTransactionReceipt() {
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
client := s.NodeManager.RPCClient()
s.NotNil(client)
@ -163,7 +155,7 @@ func (s *RPCTestSuite) TestCallRawResultGetTransactionReceipt() {
jsonResult := client.CallRaw(`{"jsonrpc":"2.0","method":"eth_getTransactionReceipt","params":["0x0ca0d8f2422f62bea77e24ed17db5711a77fa72064cccbb8e53c53b699cd3b34"],"id":5}`)
s.Equal(`{"jsonrpc":"2.0","id":5,"result":null}`, jsonResult)
s.NodeManager.StopNode() //nolint: errcheck
s.NoError(s.NodeManager.StopNode())
}
// TestCallContextResult checks if result passed to CallContext

View File

@ -6,6 +6,7 @@ import (
"github.com/status-im/status-go/geth/api"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions"
. "github.com/status-im/status-go/t/utils" //nolint: golint
@ -15,7 +16,7 @@ import (
// NodeManagerTestSuite defines a test suit with NodeManager.
type NodeManagerTestSuite struct {
suite.Suite
NodeManager common.NodeManager
NodeManager *node.NodeManager
}
func init() {
@ -47,10 +48,7 @@ func (s *NodeManagerTestSuite) StartTestNode(opts ...TestNodeOption) {
s.NoError(importTestAccouns(nodeConfig.KeyStoreDir))
s.False(s.NodeManager.IsNodeRunning())
nodeStarted, err := s.NodeManager.StartNode(nodeConfig)
s.NoError(err)
s.NotNil(nodeStarted)
<-nodeStarted
s.NoError(s.NodeManager.StartNode(nodeConfig))
s.True(s.NodeManager.IsNodeRunning())
}
@ -58,9 +56,7 @@ func (s *NodeManagerTestSuite) StartTestNode(opts ...TestNodeOption) {
func (s *NodeManagerTestSuite) StopTestNode() {
s.NotNil(s.NodeManager)
s.True(s.NodeManager.IsNodeRunning())
nodeStopped, err := s.NodeManager.StopNode()
s.NoError(err)
<-nodeStopped
s.NoError(s.NodeManager.StopNode())
s.False(s.NodeManager.IsNodeRunning())
}
@ -97,27 +93,21 @@ func (s *BackendTestSuite) StartTestBackend(opts ...TestNodeOption) {
// start node
s.False(s.Backend.IsNodeRunning())
nodeStarted, err := s.Backend.StartNode(nodeConfig)
s.NoError(err)
<-nodeStarted
s.NoError(s.Backend.StartNode(nodeConfig))
s.True(s.Backend.IsNodeRunning())
}
// StopTestBackend stops the node.
func (s *BackendTestSuite) StopTestBackend() {
s.True(s.Backend.IsNodeRunning())
backendStopped, err := s.Backend.StopNode()
s.NoError(err)
<-backendStopped
s.NoError(s.Backend.StopNode())
s.False(s.Backend.IsNodeRunning())
}
// RestartTestNode restarts a currently running node.
func (s *BackendTestSuite) RestartTestNode() {
s.True(s.Backend.IsNodeRunning())
nodeRestarted, err := s.Backend.RestartNode()
s.NoError(err)
<-nodeRestarted
s.NoError(s.Backend.RestartNode())
s.True(s.Backend.IsNodeRunning())
}

View File

@ -312,16 +312,12 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun
nodeConfig.DataDir = datadir
s.Require().NoError(err)
s.Require().False(backend.IsNodeRunning())
nodeStarted, err := backend.StartNode(nodeConfig)
s.Require().NoError(err)
<-nodeStarted // wait till node is started
s.Require().NoError(backend.StartNode(nodeConfig))
s.Require().True(backend.IsNodeRunning())
return backend, func() {
s.True(backend.IsNodeRunning())
backendStopped, err := backend.StopNode()
s.NoError(err)
<-backendStopped
s.NoError(backend.StopNode())
s.False(backend.IsNodeRunning())
os.RemoveAll(datadir)
}
@ -344,15 +340,11 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func())
mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data")
mailboxConfig.DataDir = datadir
mailboxNodeStarted, err := mailboxBackend.StartNode(mailboxConfig)
s.Require().NoError(err)
<-mailboxNodeStarted // wait till node is started
s.Require().NoError(mailboxBackend.StartNode(mailboxConfig))
s.Require().True(mailboxBackend.IsNodeRunning())
return mailboxBackend, func() {
s.True(mailboxBackend.IsNodeRunning())
backendStopped, err := mailboxBackend.StopNode()
s.NoError(err)
<-backendStopped
s.NoError(mailboxBackend.StopNode())
s.False(mailboxBackend.IsNodeRunning())
os.RemoveAll(datadir)
}

View File

@ -220,7 +220,7 @@ func GetAccount2PKFile() string {
}
// WaitClosed used to wait on a channel in tests
func WaitClosed(c chan struct{}, d time.Duration) error {
func WaitClosed(c <-chan struct{}, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {