From 0b979c507d5f5d4e8d2aa6ee1a943ba771903fe5 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 14 Feb 2018 18:32:36 +0200 Subject: [PATCH] Fix async status api (#663) --- geth/api/backend.go | 116 +++++++++++++++++++------------------- geth/node/manager.go | 8 +-- t/e2e/api/api_test.go | 33 +++++++++++ t/e2e/api/backend_test.go | 23 ++++---- t/e2e/testing.go | 7 +++ 5 files changed, 116 insertions(+), 71 deletions(-) diff --git a/geth/api/backend.go b/geth/api/backend.go index f395ec7ec..f615f8d5b 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -52,44 +52,44 @@ func NewStatusBackend() *StatusBackend { } // NodeManager returns reference to node manager -func (m *StatusBackend) NodeManager() common.NodeManager { - return m.nodeManager +func (b *StatusBackend) NodeManager() common.NodeManager { + return b.nodeManager } // AccountManager returns reference to account manager -func (m *StatusBackend) AccountManager() common.AccountManager { - return m.accountManager +func (b *StatusBackend) AccountManager() common.AccountManager { + return b.accountManager } // JailManager returns reference to jail -func (m *StatusBackend) JailManager() common.JailManager { - return m.jailManager +func (b *StatusBackend) JailManager() common.JailManager { + return b.jailManager } // TxQueueManager returns reference to transactions manager -func (m *StatusBackend) TxQueueManager() *transactions.Manager { - return m.txQueueManager +func (b *StatusBackend) TxQueueManager() *transactions.Manager { + return b.txQueueManager } // IsNodeRunning confirm that node is running -func (m *StatusBackend) IsNodeRunning() bool { - return m.nodeManager.IsNodeRunning() +func (b *StatusBackend) IsNodeRunning() bool { + return b.nodeManager.IsNodeRunning() } // StartNode start Status node, fails if node is already started -func (m *StatusBackend) StartNode(config *params.NodeConfig) error { - m.mu.Lock() - defer m.mu.Unlock() - return m.startNode(config) +func (b *StatusBackend) StartNode(config *params.NodeConfig) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.startNode(config) } -func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) { +func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("node crashed on start: %v", err) } }() - err = m.nodeManager.StartNode(config) + err = b.nodeManager.StartNode(config) if err != nil { switch err.(type) { case node.RPCClientError: @@ -108,11 +108,11 @@ func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) { signal.Send(signal.Envelope{Type: signal.EventNodeStarted}) // tx queue manager should be started after node is started, it depends // on rpc client being created - m.txQueueManager.Start() - if err := m.registerHandlers(); err != nil { + b.txQueueManager.Start() + if err := b.registerHandlers(); err != nil { log.Error("Handler registration failed", "err", err) } - if err := m.accountManager.ReSelectAccount(); err != nil { + if err := b.accountManager.ReSelectAccount(); err != nil { log.Error("Reselect account failed", "err", err) } log.Info("Account reselected") @@ -121,73 +121,75 @@ func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) { } // StopNode stop Status node. Stopped node cannot be resumed. -func (m *StatusBackend) StopNode() error { - m.mu.Lock() - defer m.mu.Unlock() - return m.stopNode() +func (b *StatusBackend) StopNode() error { + b.mu.Lock() + defer b.mu.Unlock() + return b.stopNode() } -func (m *StatusBackend) stopNode() error { - if !m.IsNodeRunning() { +func (b *StatusBackend) stopNode() error { + if !b.IsNodeRunning() { return node.ErrNoRunningNode } - m.txQueueManager.Stop() - m.jailManager.Stop() - return m.nodeManager.StopNode() + b.txQueueManager.Stop() + b.jailManager.Stop() + defer signal.Send(signal.Envelope{Type: signal.EventNodeStopped}) + return b.nodeManager.StopNode() } // RestartNode restart running Status node, fails if node is not running -func (m *StatusBackend) RestartNode() error { - if !m.IsNodeRunning() { +func (b *StatusBackend) RestartNode() error { + if !b.IsNodeRunning() { return node.ErrNoRunningNode } - config, err := m.nodeManager.NodeConfig() + config, err := b.nodeManager.NodeConfig() if err != nil { return err } newcfg := *config - if err := m.stopNode(); err != nil { + if err := b.stopNode(); err != nil { return err } - return m.startNode(&newcfg) + return b.startNode(&newcfg) } // ResetChainData remove chain data from data directory. // Node is stopped, and new node is started, with clean data directory. -func (m *StatusBackend) ResetChainData() error { - m.mu.Lock() - defer m.mu.Unlock() - config, err := m.nodeManager.NodeConfig() +func (b *StatusBackend) ResetChainData() error { + b.mu.Lock() + defer b.mu.Unlock() + config, err := b.nodeManager.NodeConfig() if err != nil { return err } newcfg := *config - if err := m.stopNode(); err != nil { + if err := b.stopNode(); err != nil { return err } - if err := m.ResetChainData(); err != nil { + // config is cleaned when node is stopped + if err := b.nodeManager.ResetChainData(&newcfg); err != nil { return err } signal.Send(signal.Envelope{Type: signal.EventChainDataRemoved}) - return m.startNode(&newcfg) + return b.startNode(&newcfg) } // CallRPC executes RPC request on node's in-proc RPC server -func (m *StatusBackend) CallRPC(inputJSON string) string { - client := m.nodeManager.RPCClient() +func (b *StatusBackend) CallRPC(inputJSON string) string { + client := b.nodeManager.RPCClient() return client.CallRaw(inputJSON) } // SendTransaction creates a new transaction and waits until it's complete. -func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { +func (b *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { if ctx == nil { ctx = context.Background() } tx := common.CreateTransaction(ctx, args) - if err = m.txQueueManager.QueueTransaction(tx); err != nil { + if err = b.txQueueManager.QueueTransaction(tx); err != nil { return hash, err } - rst := m.txQueueManager.WaitForTransaction(tx) + rst := b.txQueueManager.WaitForTransaction(tx) if rst.Error != nil { return hash, rst.Error } @@ -195,32 +197,32 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA } // CompleteTransaction instructs backend to complete sending of a given transaction -func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) { - return m.txQueueManager.CompleteTransaction(id, password) +func (b *StatusBackend) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) { + return b.txQueueManager.CompleteTransaction(id, password) } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { - return m.txQueueManager.CompleteTransactions(ids, password) +func (b *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { + return b.txQueueManager.CompleteTransactions(ids, password) } // DiscardTransaction discards a given transaction from transaction queue -func (m *StatusBackend) DiscardTransaction(id common.QueuedTxID) error { - return m.txQueueManager.DiscardTransaction(id) +func (b *StatusBackend) DiscardTransaction(id common.QueuedTxID) error { + return b.txQueueManager.DiscardTransaction(id) } // DiscardTransactions discards given multiple transactions from transaction queue -func (m *StatusBackend) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult { - return m.txQueueManager.DiscardTransactions(ids) +func (b *StatusBackend) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult { + return b.txQueueManager.DiscardTransactions(ids) } // registerHandlers attaches Status callback handlers to running node -func (m *StatusBackend) registerHandlers() error { - rpcClient := m.NodeManager().RPCClient() +func (b *StatusBackend) registerHandlers() error { + rpcClient := b.NodeManager().RPCClient() if rpcClient == nil { return node.ErrRPCClient } - rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) - rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) + rpcClient.RegisterHandler("eth_accounts", b.accountManager.AccountsRPCHandler()) + rpcClient.RegisterHandler("eth_sendTransaction", b.txQueueManager.SendTransactionRPCHandler) return nil } diff --git a/geth/node/manager.go b/geth/node/manager.go index ab1dc1495..e84938cc3 100644 --- a/geth/node/manager.go +++ b/geth/node/manager.go @@ -131,13 +131,13 @@ func (m *NodeManager) stopNode() error { } // ResetChainData removes chain data if node is not running. -func (m *NodeManager) ResetChainData() error { - if !m.IsNodeRunning() { - return ErrNoRunningNode +func (m *NodeManager) ResetChainData(config *params.NodeConfig) error { + if m.IsNodeRunning() { + return ErrNodeExists } m.mu.Lock() defer m.mu.Unlock() - chainDataDir := filepath.Join(m.config.DataDir, m.config.Name, "lightchaindata") + chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata") if _, err := os.Stat(chainDataDir); os.IsNotExist(err) { // is it really an error, if we want to remove it as next step? return err diff --git a/t/e2e/api/api_test.go b/t/e2e/api/api_test.go index a3acea723..7e6228f25 100644 --- a/t/e2e/api/api_test.go +++ b/t/e2e/api/api_test.go @@ -196,6 +196,39 @@ func (s *APITestSuite) TestLogoutRemovesCells() { require.Error(err, "Expected that cells was removed") } +func (s *APITestSuite) TestEventsNodeStartStop() { + envelopes := make(chan signal.Envelope, 3) + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + var envelope signal.Envelope + err := json.Unmarshal([]byte(jsonEvent), &envelope) + s.NoError(err) + envelopes <- envelope + }) + + nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID()) + s.NoError(err) + s.NoError(s.api.StartNode(nodeConfig)) + s.NoError(s.api.StopNode()) + s.verifyEnvelopes(envelopes, signal.EventNodeStarted, signal.EventNodeReady, signal.EventNodeStopped) + s.NoError(s.api.StartNode(nodeConfig)) + s.verifyEnvelopes(envelopes, signal.EventNodeStarted, signal.EventNodeReady) + s.NoError(s.api.RestartNode()) + s.verifyEnvelopes(envelopes, signal.EventNodeStopped, signal.EventNodeStarted, signal.EventNodeReady) + s.NoError(s.api.StopNode()) + s.verifyEnvelopes(envelopes, signal.EventNodeStopped) +} + +func (s *APITestSuite) verifyEnvelopes(envelopes chan signal.Envelope, envelopeTypes ...string) { + for _, envelopeType := range envelopeTypes { + select { + case env := <-envelopes: + s.Equal(envelopeType, env.Type) + case <-time.After(1 * time.Second): + s.Fail("timeout waiting for envelope") + } + } +} + func (s *APITestSuite) TestNodeStartCrash() { // let's listen for node.crashed signal signalReceived := make(chan struct{}) diff --git a/t/e2e/api/backend_test.go b/t/e2e/api/backend_test.go index e75ac432e..c66ec0a5b 100644 --- a/t/e2e/api/backend_test.go +++ b/t/e2e/api/backend_test.go @@ -1,7 +1,9 @@ package api_test import ( + "io/ioutil" "math/rand" + "os" "testing" "time" @@ -56,10 +58,10 @@ func (s *APIBackendTestSuite) TestRaceConditions() { progress <- struct{}{} }, // func(config *params.NodeConfig) { - // log.Info("ResetChainData()") - // _, err := s.Backend.ResetChainData() - // s.T().Logf("ResetChainData(), error: %v", err) - // progress <- struct{}{} + // log.Info("ResetChainData()") + // _, err := s.Backend.ResetChainData() + // s.T().Logf("ResetChainData(), error: %v", err) + // progress <- struct{}{} // }, func(config *params.NodeConfig) { log.Info("RestartNode()") @@ -220,20 +222,21 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() { s.NoError(s.Backend.StopNode()) } -// FIXME(tiabc): There's also a test with the same name in geth/node/manager_test.go -// so this test should only check StatusBackend logic with a mocked version of the underlying NodeManager. func (s *APIBackendTestSuite) TestResetChainData() { - s.T().Skip() - + if GetNetworkID() != params.StatusChainNetworkID { + s.T().Skip("test must be running on status network") + } require := s.Require() require.NotNil(s.Backend) + path, err := ioutil.TempDir("/tmp", "status-reset-chain-test") + require.NoError(err) + defer func() { s.NoError(os.RemoveAll(path)) }() - s.StartTestBackend() + s.StartTestBackend(e2e.WithDataDir(path)) defer s.StopTestBackend() EnsureNodeSync(s.Backend.NodeManager()) - s.True(s.Backend.IsNodeRunning()) require.NoError(s.Backend.ResetChainData()) s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running diff --git a/t/e2e/testing.go b/t/e2e/testing.go index 1352e7e47..14ac20690 100644 --- a/t/e2e/testing.go +++ b/t/e2e/testing.go @@ -24,6 +24,13 @@ func WithUpstream(url string) TestNodeOption { } } +// WithDataDir returns TestNodeOption that allows to set another data dir. +func WithDataDir(path string) TestNodeOption { + return func(config *params.NodeConfig) { + config.DataDir = path + } +} + // MakeTestNodeConfig defines a function to return a giving params.NodeConfig // where specific network addresses are assigned based on provieded network id. func MakeTestNodeConfig(networkID int) (*params.NodeConfig, error) {