status-go/geth/node/manager.go
Dmitry Shulyak b2580c79d7
Add destructive tests (#646)
Network disconnect is introduced by removing default gateway, easily reversible condition.

On my local machine it takes 30 seconds for peers to reconnect after connectivity is restored. As you guess this is not an accident, and there is 30 seconds timeout for dial expiration. This dial expiration is used in p2p.Server to guarantee that peers are not dialed too often.

Additionally I added small script to Makefile to run such tests in docker environment, usage example:

```
make docker-test ARGS="./t/destructive/ -v -network=4"
```
2018-02-19 17:32:58 +02:00

454 lines
12 KiB
Go

package node
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/mailservice"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
)
// errors
var (
ErrNodeExists = errors.New("node is already running")
ErrNoRunningNode = errors.New("there is no running node")
ErrInvalidNodeManager = errors.New("node manager is not properly initialized")
ErrInvalidWhisperService = errors.New("whisper service is unavailable")
ErrInvalidLightEthereumService = errors.New("LES service is unavailable")
ErrInvalidAccountManager = errors.New("could not retrieve account manager")
ErrAccountKeyStoreMissing = errors.New("account key store is not set")
ErrRPCClient = errors.New("failed to init RPC client")
)
// RPCClientError reported when rpc client is initialized.
type RPCClientError error
// EthNodeError is reported when node crashed on start up.
type EthNodeError error
// NodeManager manages Status node (which abstracts contained geth node)
// nolint: golint
// should be fixed at https://github.com/status-im/status-go/issues/200
type NodeManager struct {
mu sync.RWMutex
config *params.NodeConfig // Status node configuration
node *node.Node // reference to Geth P2P stack/node
whisperService *whisper.Whisper // reference to Whisper service
lesService *les.LightEthereum // reference to LES service
rpcClient *rpc.Client // reference to RPC client
}
// NewNodeManager makes new instance of node manager
func NewNodeManager() *NodeManager {
return &NodeManager{}
}
// StartNode start Status node, fails if node is already started
func (m *NodeManager) StartNode(config *params.NodeConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.startNode(config)
}
// startNode start Status node, fails if node is already started
func (m *NodeManager) startNode(config *params.NodeConfig) error {
if err := m.isNodeAvailable(); err == nil {
return ErrNodeExists
}
m.initLog(config)
ethNode, err := MakeNode(config)
if err != nil {
return err
}
m.node = ethNode
m.config = config
// activate MailService required for Offline Inboxing
if err := ethNode.Register(func(_ *node.ServiceContext) (node.Service, error) {
return mailservice.New(m), nil
}); err != nil {
return err
}
// start underlying node
if err := ethNode.Start(); err != nil {
return EthNodeError(err)
}
// init RPC client for this node
localRPCClient, err := m.node.Attach()
if err == nil {
m.rpcClient, err = rpc.NewClient(localRPCClient, m.config.UpstreamConfig)
}
if err != nil {
log.Error("Failed to create an RPC client", "error", err)
return RPCClientError(err)
}
// populate static peers exits when node stopped
go func() {
if err := m.PopulateStaticPeers(); err != nil {
log.Error("Static peers population", "error", err)
}
}()
return nil
}
// StopNode stop Status node. Stopped node cannot be resumed.
func (m *NodeManager) StopNode() error {
m.mu.Lock()
defer m.mu.Unlock()
return m.stopNode()
}
// stopNode stop Status node. Stopped node cannot be resumed.
func (m *NodeManager) stopNode() error {
if err := m.isNodeAvailable(); err != nil {
return err
}
if err := m.node.Stop(); err != nil {
return err
}
m.node = nil
m.config = nil
m.lesService = nil
m.whisperService = nil
m.rpcClient = nil
return nil
}
// ResetChainData removes chain data if node is not running.
func (m *NodeManager) ResetChainData(config *params.NodeConfig) error {
if m.IsNodeRunning() {
return ErrNodeExists
}
m.mu.Lock()
defer m.mu.Unlock()
chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata")
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
// is it really an error, if we want to remove it as next step?
return err
}
err := os.RemoveAll(chainDataDir)
if err == nil {
log.Info("Chain data has been removed", "dir", chainDataDir)
}
return err
}
// IsNodeRunning confirm that node is running
func (m *NodeManager) IsNodeRunning() bool {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return false
}
return true
}
// Node returns underlying Status node
func (m *NodeManager) Node() (*node.Node, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
return m.node, nil
}
// PopulateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (m *NodeManager) PopulateStaticPeers() error {
m.mu.RLock()
defer m.mu.RUnlock()
return m.populateStaticPeers()
}
// populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (m *NodeManager) populateStaticPeers() error {
if err := m.isNodeAvailable(); err != nil {
return err
}
if !m.config.BootClusterConfig.Enabled {
log.Info("Boot cluster is disabled")
return nil
}
for _, enode := range m.config.BootClusterConfig.BootNodes {
err := m.addPeer(enode)
if err != nil {
log.Warn("Boot node addition failed", "error", err)
continue
}
log.Info("Boot node added", "enode", enode)
}
return nil
}
func (m *NodeManager) removeStaticPeers() error {
if !m.config.BootClusterConfig.Enabled {
log.Info("Boot cluster is disabled")
return nil
}
server := m.node.Server()
if server == nil {
return ErrNoRunningNode
}
for _, enode := range m.config.BootClusterConfig.BootNodes {
err := m.removePeer(enode)
if err != nil {
log.Warn("Boot node deletion failed", "error", err)
return err
}
log.Info("Boot node deleted", "enode", enode)
}
return nil
}
// ReconnectStaticPeers removes and adds static peers to a server.
func (m *NodeManager) ReconnectStaticPeers() error {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.removeStaticPeers(); err != nil {
return err
}
return m.populateStaticPeers()
}
// AddPeer adds new static peer node
func (m *NodeManager) AddPeer(url string) error {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return err
}
return m.addPeer(url)
}
// addPeer adds new static peer node
func (m *NodeManager) addPeer(url string) error {
// Try to add the url as a static peer and return
parsedNode, err := discover.ParseNode(url)
if err != nil {
return err
}
m.node.Server().AddPeer(parsedNode)
return nil
}
func (m *NodeManager) removePeer(url string) error {
parsedNode, err := discover.ParseNode(url)
if err != nil {
return err
}
m.node.Server().RemovePeer(parsedNode)
return nil
}
// PeerCount returns the number of connected peers.
func (m *NodeManager) PeerCount() int {
if !m.IsNodeRunning() {
return 0
}
return m.node.Server().PeerCount()
}
// NodeConfig exposes reference to running node's configuration
func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
return m.config, nil
}
// LightEthereumService exposes reference to LES service running on top of the node
func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
if m.lesService == nil {
if err := m.node.Service(&m.lesService); err != nil {
log.Warn("Cannot obtain LES service", "error", err)
return nil, ErrInvalidLightEthereumService
}
}
if m.lesService == nil {
return nil, ErrInvalidLightEthereumService
}
return m.lesService, nil
}
// WhisperService exposes reference to Whisper service running on top of the node
func (m *NodeManager) WhisperService() (*whisper.Whisper, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
if m.whisperService == nil {
if err := m.node.Service(&m.whisperService); err != nil {
log.Warn("Cannot obtain whisper service", "error", err)
return nil, ErrInvalidWhisperService
}
}
if m.whisperService == nil {
return nil, ErrInvalidWhisperService
}
return m.whisperService, nil
}
// AccountManager exposes reference to node's accounts manager
func (m *NodeManager) AccountManager() (*accounts.Manager, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
accountManager := m.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
}
return accountManager, nil
}
// AccountKeyStore exposes reference to accounts key store
func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.isNodeAvailable(); err != nil {
return nil, err
}
accountManager := m.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
}
backends := accountManager.Backends(keystore.KeyStoreType)
if len(backends) == 0 {
return nil, ErrAccountKeyStoreMissing
}
keyStore, ok := backends[0].(*keystore.KeyStore)
if !ok {
return nil, ErrAccountKeyStoreMissing
}
return keyStore, nil
}
// RPCClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCClient() *rpc.Client {
m.mu.Lock()
defer m.mu.Unlock()
return m.rpcClient
}
// initLog initializes global logger parameters based on
// provided node configurations.
func (m *NodeManager) initLog(config *params.NodeConfig) {
log.SetLevel(config.LogLevel)
if config.LogFile != "" {
err := log.SetLogFile(config.LogFile)
if err != nil {
fmt.Println("Failed to open log file, using stdout")
}
}
}
// isNodeAvailable check if we have a node running and make sure is fully started
func (m *NodeManager) isNodeAvailable() error {
if m.node == nil || m.node.Server() == nil {
return ErrNoRunningNode
}
return nil
}
// tickerResolution is the delta to check blockchain sync progress.
const tickerResolution = time.Second
// EnsureSync waits until blockchain synchronization
// is complete and returns.
func (m *NodeManager) EnsureSync(ctx context.Context) error {
// Don't wait for any blockchain sync for the
// local private chain as blocks are never mined.
if m.config.NetworkID == params.StatusChainNetworkID {
return nil
}
return m.ensureSync(ctx)
}
func (m *NodeManager) ensureSync(ctx context.Context) error {
les, err := m.LightEthereumService()
if err != nil {
return fmt.Errorf("failed to get LES service: %v", err)
}
downloader := les.Downloader()
if downloader == nil {
return errors.New("LightEthereumService downloader is nil")
}
progress := downloader.Progress()
if m.PeerCount() > 0 && progress.CurrentBlock >= progress.HighestBlock {
log.Debug("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
return nil
}
ticker := time.NewTicker(tickerResolution)
defer ticker.Stop()
progressTicker := time.NewTicker(time.Minute)
defer progressTicker.Stop()
for {
select {
case <-ctx.Done():
return errors.New("timeout during node synchronization")
case <-ticker.C:
if m.PeerCount() == 0 {
log.Debug("No established connections with any peers, continue waiting for a sync")
continue
}
if downloader.Synchronising() {
log.Debug("Synchronization is in progress")
continue
}
progress = downloader.Progress()
if progress.CurrentBlock >= progress.HighestBlock {
log.Info("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
return nil
}
log.Debug("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
case <-progressTicker.C:
progress = downloader.Progress()
log.Warn("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
}
}
}