Fix async status api (#663)

This commit is contained in:
Dmitry Shulyak 2018-02-14 18:32:36 +02:00 committed by Igor Mandrigin
parent adbc7b5ae6
commit 0b979c507d
5 changed files with 116 additions and 71 deletions

View File

@ -52,44 +52,44 @@ func NewStatusBackend() *StatusBackend {
} }
// NodeManager returns reference to node manager // NodeManager returns reference to node manager
func (m *StatusBackend) NodeManager() common.NodeManager { func (b *StatusBackend) NodeManager() common.NodeManager {
return m.nodeManager return b.nodeManager
} }
// AccountManager returns reference to account manager // AccountManager returns reference to account manager
func (m *StatusBackend) AccountManager() common.AccountManager { func (b *StatusBackend) AccountManager() common.AccountManager {
return m.accountManager return b.accountManager
} }
// JailManager returns reference to jail // JailManager returns reference to jail
func (m *StatusBackend) JailManager() common.JailManager { func (b *StatusBackend) JailManager() common.JailManager {
return m.jailManager return b.jailManager
} }
// TxQueueManager returns reference to transactions manager // TxQueueManager returns reference to transactions manager
func (m *StatusBackend) TxQueueManager() *transactions.Manager { func (b *StatusBackend) TxQueueManager() *transactions.Manager {
return m.txQueueManager return b.txQueueManager
} }
// IsNodeRunning confirm that node is running // IsNodeRunning confirm that node is running
func (m *StatusBackend) IsNodeRunning() bool { func (b *StatusBackend) IsNodeRunning() bool {
return m.nodeManager.IsNodeRunning() return b.nodeManager.IsNodeRunning()
} }
// StartNode start Status node, fails if node is already started // StartNode start Status node, fails if node is already started
func (m *StatusBackend) StartNode(config *params.NodeConfig) error { func (b *StatusBackend) StartNode(config *params.NodeConfig) error {
m.mu.Lock() b.mu.Lock()
defer m.mu.Unlock() defer b.mu.Unlock()
return m.startNode(config) return b.startNode(config)
} }
func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) { func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("node crashed on start: %v", err) err = fmt.Errorf("node crashed on start: %v", err)
} }
}() }()
err = m.nodeManager.StartNode(config) err = b.nodeManager.StartNode(config)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case node.RPCClientError: case node.RPCClientError:
@ -108,11 +108,11 @@ func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) {
signal.Send(signal.Envelope{Type: signal.EventNodeStarted}) signal.Send(signal.Envelope{Type: signal.EventNodeStarted})
// tx queue manager should be started after node is started, it depends // tx queue manager should be started after node is started, it depends
// on rpc client being created // on rpc client being created
m.txQueueManager.Start() b.txQueueManager.Start()
if err := m.registerHandlers(); err != nil { if err := b.registerHandlers(); err != nil {
log.Error("Handler registration failed", "err", err) log.Error("Handler registration failed", "err", err)
} }
if err := m.accountManager.ReSelectAccount(); err != nil { if err := b.accountManager.ReSelectAccount(); err != nil {
log.Error("Reselect account failed", "err", err) log.Error("Reselect account failed", "err", err)
} }
log.Info("Account reselected") log.Info("Account reselected")
@ -121,73 +121,75 @@ func (m *StatusBackend) startNode(config *params.NodeConfig) (err error) {
} }
// StopNode stop Status node. Stopped node cannot be resumed. // StopNode stop Status node. Stopped node cannot be resumed.
func (m *StatusBackend) StopNode() error { func (b *StatusBackend) StopNode() error {
m.mu.Lock() b.mu.Lock()
defer m.mu.Unlock() defer b.mu.Unlock()
return m.stopNode() return b.stopNode()
} }
func (m *StatusBackend) stopNode() error { func (b *StatusBackend) stopNode() error {
if !m.IsNodeRunning() { if !b.IsNodeRunning() {
return node.ErrNoRunningNode return node.ErrNoRunningNode
} }
m.txQueueManager.Stop() b.txQueueManager.Stop()
m.jailManager.Stop() b.jailManager.Stop()
return m.nodeManager.StopNode() defer signal.Send(signal.Envelope{Type: signal.EventNodeStopped})
return b.nodeManager.StopNode()
} }
// RestartNode restart running Status node, fails if node is not running // RestartNode restart running Status node, fails if node is not running
func (m *StatusBackend) RestartNode() error { func (b *StatusBackend) RestartNode() error {
if !m.IsNodeRunning() { if !b.IsNodeRunning() {
return node.ErrNoRunningNode return node.ErrNoRunningNode
} }
config, err := m.nodeManager.NodeConfig() config, err := b.nodeManager.NodeConfig()
if err != nil { if err != nil {
return err return err
} }
newcfg := *config newcfg := *config
if err := m.stopNode(); err != nil { if err := b.stopNode(); err != nil {
return err return err
} }
return m.startNode(&newcfg) return b.startNode(&newcfg)
} }
// ResetChainData remove chain data from data directory. // ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory. // Node is stopped, and new node is started, with clean data directory.
func (m *StatusBackend) ResetChainData() error { func (b *StatusBackend) ResetChainData() error {
m.mu.Lock() b.mu.Lock()
defer m.mu.Unlock() defer b.mu.Unlock()
config, err := m.nodeManager.NodeConfig() config, err := b.nodeManager.NodeConfig()
if err != nil { if err != nil {
return err return err
} }
newcfg := *config newcfg := *config
if err := m.stopNode(); err != nil { if err := b.stopNode(); err != nil {
return err return err
} }
if err := m.ResetChainData(); err != nil { // config is cleaned when node is stopped
if err := b.nodeManager.ResetChainData(&newcfg); err != nil {
return err return err
} }
signal.Send(signal.Envelope{Type: signal.EventChainDataRemoved}) signal.Send(signal.Envelope{Type: signal.EventChainDataRemoved})
return m.startNode(&newcfg) return b.startNode(&newcfg)
} }
// CallRPC executes RPC request on node's in-proc RPC server // CallRPC executes RPC request on node's in-proc RPC server
func (m *StatusBackend) CallRPC(inputJSON string) string { func (b *StatusBackend) CallRPC(inputJSON string) string {
client := m.nodeManager.RPCClient() client := b.nodeManager.RPCClient()
return client.CallRaw(inputJSON) return client.CallRaw(inputJSON)
} }
// SendTransaction creates a new transaction and waits until it's complete. // SendTransaction creates a new transaction and waits until it's complete.
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { func (b *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
} }
tx := common.CreateTransaction(ctx, args) tx := common.CreateTransaction(ctx, args)
if err = m.txQueueManager.QueueTransaction(tx); err != nil { if err = b.txQueueManager.QueueTransaction(tx); err != nil {
return hash, err return hash, err
} }
rst := m.txQueueManager.WaitForTransaction(tx) rst := b.txQueueManager.WaitForTransaction(tx)
if rst.Error != nil { if rst.Error != nil {
return hash, rst.Error return hash, rst.Error
} }
@ -195,32 +197,32 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
} }
// CompleteTransaction instructs backend to complete sending of a given transaction // CompleteTransaction instructs backend to complete sending of a given transaction
func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) { func (b *StatusBackend) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) {
return m.txQueueManager.CompleteTransaction(id, password) return b.txQueueManager.CompleteTransaction(id, password)
} }
// CompleteTransactions instructs backend to complete sending of multiple transactions // CompleteTransactions instructs backend to complete sending of multiple transactions
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { func (b *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
return m.txQueueManager.CompleteTransactions(ids, password) return b.txQueueManager.CompleteTransactions(ids, password)
} }
// DiscardTransaction discards a given transaction from transaction queue // DiscardTransaction discards a given transaction from transaction queue
func (m *StatusBackend) DiscardTransaction(id common.QueuedTxID) error { func (b *StatusBackend) DiscardTransaction(id common.QueuedTxID) error {
return m.txQueueManager.DiscardTransaction(id) return b.txQueueManager.DiscardTransaction(id)
} }
// DiscardTransactions discards given multiple transactions from transaction queue // DiscardTransactions discards given multiple transactions from transaction queue
func (m *StatusBackend) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult { func (b *StatusBackend) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult {
return m.txQueueManager.DiscardTransactions(ids) return b.txQueueManager.DiscardTransactions(ids)
} }
// registerHandlers attaches Status callback handlers to running node // registerHandlers attaches Status callback handlers to running node
func (m *StatusBackend) registerHandlers() error { func (b *StatusBackend) registerHandlers() error {
rpcClient := m.NodeManager().RPCClient() rpcClient := b.NodeManager().RPCClient()
if rpcClient == nil { if rpcClient == nil {
return node.ErrRPCClient return node.ErrRPCClient
} }
rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) rpcClient.RegisterHandler("eth_accounts", b.accountManager.AccountsRPCHandler())
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) rpcClient.RegisterHandler("eth_sendTransaction", b.txQueueManager.SendTransactionRPCHandler)
return nil return nil
} }

View File

@ -131,13 +131,13 @@ func (m *NodeManager) stopNode() error {
} }
// ResetChainData removes chain data if node is not running. // ResetChainData removes chain data if node is not running.
func (m *NodeManager) ResetChainData() error { func (m *NodeManager) ResetChainData(config *params.NodeConfig) error {
if !m.IsNodeRunning() { if m.IsNodeRunning() {
return ErrNoRunningNode return ErrNodeExists
} }
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
chainDataDir := filepath.Join(m.config.DataDir, m.config.Name, "lightchaindata") chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata")
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) { if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
// is it really an error, if we want to remove it as next step? // is it really an error, if we want to remove it as next step?
return err return err

View File

@ -196,6 +196,39 @@ func (s *APITestSuite) TestLogoutRemovesCells() {
require.Error(err, "Expected that cells was removed") require.Error(err, "Expected that cells was removed")
} }
func (s *APITestSuite) TestEventsNodeStartStop() {
envelopes := make(chan signal.Envelope, 3)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
envelopes <- envelope
})
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
s.NoError(err)
s.NoError(s.api.StartNode(nodeConfig))
s.NoError(s.api.StopNode())
s.verifyEnvelopes(envelopes, signal.EventNodeStarted, signal.EventNodeReady, signal.EventNodeStopped)
s.NoError(s.api.StartNode(nodeConfig))
s.verifyEnvelopes(envelopes, signal.EventNodeStarted, signal.EventNodeReady)
s.NoError(s.api.RestartNode())
s.verifyEnvelopes(envelopes, signal.EventNodeStopped, signal.EventNodeStarted, signal.EventNodeReady)
s.NoError(s.api.StopNode())
s.verifyEnvelopes(envelopes, signal.EventNodeStopped)
}
func (s *APITestSuite) verifyEnvelopes(envelopes chan signal.Envelope, envelopeTypes ...string) {
for _, envelopeType := range envelopeTypes {
select {
case env := <-envelopes:
s.Equal(envelopeType, env.Type)
case <-time.After(1 * time.Second):
s.Fail("timeout waiting for envelope")
}
}
}
func (s *APITestSuite) TestNodeStartCrash() { func (s *APITestSuite) TestNodeStartCrash() {
// let's listen for node.crashed signal // let's listen for node.crashed signal
signalReceived := make(chan struct{}) signalReceived := make(chan struct{})

View File

@ -1,7 +1,9 @@
package api_test package api_test
import ( import (
"io/ioutil"
"math/rand" "math/rand"
"os"
"testing" "testing"
"time" "time"
@ -56,10 +58,10 @@ func (s *APIBackendTestSuite) TestRaceConditions() {
progress <- struct{}{} progress <- struct{}{}
}, },
// func(config *params.NodeConfig) { // func(config *params.NodeConfig) {
// log.Info("ResetChainData()") // log.Info("ResetChainData()")
// _, err := s.Backend.ResetChainData() // _, err := s.Backend.ResetChainData()
// s.T().Logf("ResetChainData(), error: %v", err) // s.T().Logf("ResetChainData(), error: %v", err)
// progress <- struct{}{} // progress <- struct{}{}
// }, // },
func(config *params.NodeConfig) { func(config *params.NodeConfig) {
log.Info("RestartNode()") log.Info("RestartNode()")
@ -220,20 +222,21 @@ func (s *APIBackendTestSuite) TestNetworkSwitching() {
s.NoError(s.Backend.StopNode()) s.NoError(s.Backend.StopNode())
} }
// FIXME(tiabc): There's also a test with the same name in geth/node/manager_test.go
// so this test should only check StatusBackend logic with a mocked version of the underlying NodeManager.
func (s *APIBackendTestSuite) TestResetChainData() { func (s *APIBackendTestSuite) TestResetChainData() {
s.T().Skip() if GetNetworkID() != params.StatusChainNetworkID {
s.T().Skip("test must be running on status network")
}
require := s.Require() require := s.Require()
require.NotNil(s.Backend) require.NotNil(s.Backend)
path, err := ioutil.TempDir("/tmp", "status-reset-chain-test")
require.NoError(err)
defer func() { s.NoError(os.RemoveAll(path)) }()
s.StartTestBackend() s.StartTestBackend(e2e.WithDataDir(path))
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.NodeManager()) EnsureNodeSync(s.Backend.NodeManager())
s.True(s.Backend.IsNodeRunning())
require.NoError(s.Backend.ResetChainData()) require.NoError(s.Backend.ResetChainData())
s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running s.True(s.Backend.IsNodeRunning()) // new node, with previous config should be running

View File

@ -24,6 +24,13 @@ func WithUpstream(url string) TestNodeOption {
} }
} }
// WithDataDir returns TestNodeOption that allows to set another data dir.
func WithDataDir(path string) TestNodeOption {
return func(config *params.NodeConfig) {
config.DataDir = path
}
}
// MakeTestNodeConfig defines a function to return a giving params.NodeConfig // MakeTestNodeConfig defines a function to return a giving params.NodeConfig
// where specific network addresses are assigned based on provieded network id. // where specific network addresses are assigned based on provieded network id.
func MakeTestNodeConfig(networkID int) (*params.NodeConfig, error) { func MakeTestNodeConfig(networkID int) (*params.NodeConfig, error) {