status-go/geth/node/status_node.go
Adrià Cidre 359b3621e9 [#797] Simplify node.Manager public api and rename it to StatusNode
- [x] [#797] : Remove unused methods PopulateStaticPeers, ReconnectStaticPeers, removeStaticPeers, removePeer
- [x] [#797] : Rename node.Manager to node. StatusNode and simplify its public api
- [x] [#797] : Rename all references to nodeManager to statusNode
2018-04-05 16:47:27 +02:00

430 lines
11 KiB
Go

package node
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/mailservice"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
)
// errors
var (
ErrNodeExists = errors.New("node is already running")
ErrNoRunningNode = errors.New("there is no running node")
ErrInvalidStatusNode = errors.New("status node is not properly initialized")
ErrInvalidWhisperService = errors.New("whisper service is unavailable")
ErrInvalidLightEthereumService = errors.New("LES service is unavailable")
ErrInvalidAccountManager = errors.New("could not retrieve account manager")
ErrAccountKeyStoreMissing = errors.New("account key store is not set")
ErrRPCClient = errors.New("failed to init RPC client")
)
// RPCClientError reported when rpc client is initialized.
type RPCClientError error
// EthNodeError is reported when node crashed on start up.
type EthNodeError error
// StatusNode abstracts contained geth node and provides helper methods to
// interact with it.
type StatusNode struct {
mu sync.RWMutex
config *params.NodeConfig // Status node configuration
node *node.Node // reference to Geth P2P stack/node
whisperService *whisper.Whisper // reference to Whisper service
lesService *les.LightEthereum // reference to LES service
rpcClient *rpc.Client // reference to RPC client
log log.Logger
}
// New makes new instance of StatusNode.
func New() *StatusNode {
return &StatusNode{
log: log.New("package", "status-go/geth/node.StatusNode"),
}
}
// Start starts current StatusNode, will fail if it's already started.
func (n *StatusNode) Start(config *params.NodeConfig) error {
n.mu.Lock()
defer n.mu.Unlock()
return n.start(config)
}
// start starts current StatusNode, will fail if it's already started.
func (n *StatusNode) start(config *params.NodeConfig) error {
if err := n.isAvailable(); err == nil {
return ErrNodeExists
}
ethNode, err := MakeNode(config)
if err != nil {
return err
}
n.node = ethNode
n.config = config
// activate MailService required for Offline Inboxing
if err := ethNode.Register(func(_ *node.ServiceContext) (node.Service, error) {
return mailservice.New(n), nil
}); err != nil {
return err
}
// start underlying node
if err := ethNode.Start(); err != nil {
return EthNodeError(err)
}
// init RPC client for this node
localRPCClient, err := n.node.Attach()
if err == nil {
n.rpcClient, err = rpc.NewClient(localRPCClient, n.config.UpstreamConfig)
}
if err != nil {
n.log.Error("Failed to create an RPC client", "error", err)
return RPCClientError(err)
}
return nil
}
// Stop will stop current StatusNode. A stopped node cannot be resumed.
func (n *StatusNode) Stop() error {
n.mu.Lock()
defer n.mu.Unlock()
return n.stop()
}
// stop will stop current StatusNode. A stopped node cannot be resumed.
func (n *StatusNode) stop() error {
if err := n.isAvailable(); err != nil {
return err
}
if err := n.node.Stop(); err != nil {
return err
}
n.node = nil
n.config = nil
n.lesService = nil
n.whisperService = nil
n.rpcClient = nil
return nil
}
// ResetChainData removes chain data if node is not running.
func (n *StatusNode) ResetChainData(config *params.NodeConfig) error {
if n.IsRunning() {
return ErrNodeExists
}
n.mu.Lock()
defer n.mu.Unlock()
chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata")
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
// is it really an error, if we want to remove it as next step?
return err
}
err := os.RemoveAll(chainDataDir)
if err == nil {
n.log.Info("Chain data has been removed", "dir", chainDataDir)
}
return err
}
// IsRunning confirm that node is running.
func (n *StatusNode) IsRunning() bool {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return false
}
return true
}
// GethNode returns underlying geth node.
func (n *StatusNode) GethNode() (*node.Node, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
return n.node, nil
}
// populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (n *StatusNode) populateStaticPeers() error {
if err := n.isAvailable(); err != nil {
return err
}
if !n.config.ClusterConfig.Enabled {
n.log.Info("Static peers are disabled")
return nil
}
for _, enode := range n.config.ClusterConfig.StaticNodes {
err := n.addPeer(enode)
if err != nil {
n.log.Warn("Static peer addition failed", "error", err)
continue
}
n.log.Info("Static peer added", "enode", enode)
}
return nil
}
func (n *StatusNode) removeStaticPeers() error {
if !n.config.ClusterConfig.Enabled {
n.log.Info("Static peers are disabled")
return nil
}
server := n.node.Server()
if server == nil {
return ErrNoRunningNode
}
for _, enode := range n.config.ClusterConfig.StaticNodes {
err := n.removePeer(enode)
if err != nil {
n.log.Warn("Static peer deletion failed", "error", err)
return err
}
n.log.Info("Static peer deleted", "enode", enode)
}
return nil
}
// ReconnectStaticPeers removes and adds static peers to a server.
func (n *StatusNode) ReconnectStaticPeers() error {
n.mu.Lock()
defer n.mu.Unlock()
if err := n.removeStaticPeers(); err != nil {
return err
}
return n.populateStaticPeers()
}
// AddPeer adds new static peer node
func (n *StatusNode) AddPeer(url string) error {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return err
}
return n.addPeer(url)
}
// addPeer adds new static peer node
func (n *StatusNode) addPeer(url string) error {
// Try to add the url as a static peer and return
parsedNode, err := discover.ParseNode(url)
if err != nil {
return err
}
n.node.Server().AddPeer(parsedNode)
return nil
}
func (n *StatusNode) removePeer(url string) error {
parsedNode, err := discover.ParseNode(url)
if err != nil {
return err
}
n.node.Server().RemovePeer(parsedNode)
return nil
}
// PeerCount returns the number of connected peers.
func (n *StatusNode) PeerCount() int {
if !n.IsRunning() {
return 0
}
return n.node.Server().PeerCount()
}
// Config exposes reference to running node's configuration
func (n *StatusNode) Config() (*params.NodeConfig, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
return n.config, nil
}
// LightEthereumService exposes reference to LES service running on top of the node
func (n *StatusNode) LightEthereumService() (*les.LightEthereum, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
if n.lesService == nil {
if err := n.node.Service(&n.lesService); err != nil {
n.log.Warn("Cannot obtain LES service", "error", err)
return nil, ErrInvalidLightEthereumService
}
}
if n.lesService == nil {
return nil, ErrInvalidLightEthereumService
}
return n.lesService, nil
}
// WhisperService exposes reference to Whisper service running on top of the node
func (n *StatusNode) WhisperService() (*whisper.Whisper, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
if n.whisperService == nil {
if err := n.node.Service(&n.whisperService); err != nil {
n.log.Warn("Cannot obtain whisper service", "error", err)
return nil, ErrInvalidWhisperService
}
}
if n.whisperService == nil {
return nil, ErrInvalidWhisperService
}
return n.whisperService, nil
}
// AccountManager exposes reference to node's accounts manager
func (n *StatusNode) AccountManager() (*accounts.Manager, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
accountManager := n.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
}
return accountManager, nil
}
// AccountKeyStore exposes reference to accounts key store
func (n *StatusNode) AccountKeyStore() (*keystore.KeyStore, error) {
n.mu.RLock()
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
accountManager := n.node.AccountManager()
if accountManager == nil {
return nil, ErrInvalidAccountManager
}
backends := accountManager.Backends(keystore.KeyStoreType)
if len(backends) == 0 {
return nil, ErrAccountKeyStoreMissing
}
keyStore, ok := backends[0].(*keystore.KeyStore)
if !ok {
return nil, ErrAccountKeyStoreMissing
}
return keyStore, nil
}
// RPCClient exposes reference to RPC client connected to the running node.
func (n *StatusNode) RPCClient() *rpc.Client {
n.mu.Lock()
defer n.mu.Unlock()
return n.rpcClient
}
// isAvailable check if we have a node running and make sure is fully started
func (n *StatusNode) isAvailable() error {
if n.node == nil || n.node.Server() == nil {
return ErrNoRunningNode
}
return nil
}
// tickerResolution is the delta to check blockchain sync progress.
const tickerResolution = time.Second
// EnsureSync waits until blockchain synchronization
// is complete and returns.
func (n *StatusNode) EnsureSync(ctx context.Context) error {
// Don't wait for any blockchain sync for the
// local private chain as blocks are never mined.
if n.config.NetworkID == params.StatusChainNetworkID {
return nil
}
return n.ensureSync(ctx)
}
func (n *StatusNode) ensureSync(ctx context.Context) error {
les, err := n.LightEthereumService()
if err != nil {
return fmt.Errorf("failed to get LES service: %v", err)
}
downloader := les.Downloader()
if downloader == nil {
return errors.New("LightEthereumService downloader is nil")
}
progress := downloader.Progress()
if n.PeerCount() > 0 && progress.CurrentBlock >= progress.HighestBlock {
n.log.Debug("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
return nil
}
ticker := time.NewTicker(tickerResolution)
defer ticker.Stop()
progressTicker := time.NewTicker(time.Minute)
defer progressTicker.Stop()
for {
select {
case <-ctx.Done():
return errors.New("timeout during node synchronization")
case <-ticker.C:
if n.PeerCount() == 0 {
n.log.Debug("No established connections with any peers, continue waiting for a sync")
continue
}
if downloader.Synchronising() {
n.log.Debug("Synchronization is in progress")
continue
}
progress = downloader.Progress()
if progress.CurrentBlock >= progress.HighestBlock {
n.log.Info("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
return nil
}
n.log.Debug("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
case <-progressTicker.C:
progress = downloader.Progress()
n.log.Warn("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
}
}
}