438 lines
12 KiB
Go
438 lines
12 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/accounts"
|
|
"github.com/ethereum/go-ethereum/accounts/keystore"
|
|
"github.com/ethereum/go-ethereum/les"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/node"
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
|
|
|
"github.com/status-im/status-go/geth/mailservice"
|
|
"github.com/status-im/status-go/geth/params"
|
|
"github.com/status-im/status-go/geth/rpc"
|
|
)
|
|
|
|
// errors
|
|
var (
|
|
ErrNodeExists = errors.New("node is already running")
|
|
ErrNoRunningNode = errors.New("there is no running node")
|
|
ErrInvalidNodeManager = errors.New("node manager is not properly initialized")
|
|
ErrInvalidWhisperService = errors.New("whisper service is unavailable")
|
|
ErrInvalidLightEthereumService = errors.New("LES service is unavailable")
|
|
ErrInvalidAccountManager = errors.New("could not retrieve account manager")
|
|
ErrAccountKeyStoreMissing = errors.New("account key store is not set")
|
|
ErrRPCClient = errors.New("failed to init RPC client")
|
|
)
|
|
|
|
// RPCClientError reported when rpc client is initialized.
|
|
type RPCClientError error
|
|
|
|
// EthNodeError is reported when node crashed on start up.
|
|
type EthNodeError error
|
|
|
|
// NodeManager manages Status node (which abstracts contained geth node)
|
|
// nolint: golint
|
|
// should be fixed at https://github.com/status-im/status-go/issues/200
|
|
type NodeManager struct {
|
|
mu sync.RWMutex
|
|
config *params.NodeConfig // Status node configuration
|
|
node *node.Node // reference to Geth P2P stack/node
|
|
|
|
whisperService *whisper.Whisper // reference to Whisper service
|
|
lesService *les.LightEthereum // reference to LES service
|
|
rpcClient *rpc.Client // reference to RPC client
|
|
log log.Logger
|
|
}
|
|
|
|
// NewNodeManager makes new instance of node manager
|
|
func NewNodeManager() *NodeManager {
|
|
return &NodeManager{
|
|
log: log.New("package", "status-go/geth/node.NodeManager"),
|
|
}
|
|
}
|
|
|
|
// StartNode start Status node, fails if node is already started
|
|
func (m *NodeManager) StartNode(config *params.NodeConfig) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.startNode(config)
|
|
}
|
|
|
|
// startNode start Status node, fails if node is already started
|
|
func (m *NodeManager) startNode(config *params.NodeConfig) error {
|
|
if err := m.isNodeAvailable(); err == nil {
|
|
return ErrNodeExists
|
|
}
|
|
|
|
ethNode, err := MakeNode(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.node = ethNode
|
|
m.config = config
|
|
|
|
// activate MailService required for Offline Inboxing
|
|
if err := ethNode.Register(func(_ *node.ServiceContext) (node.Service, error) {
|
|
return mailservice.New(m), nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// start underlying node
|
|
if err := ethNode.Start(); err != nil {
|
|
return EthNodeError(err)
|
|
}
|
|
// init RPC client for this node
|
|
localRPCClient, err := m.node.Attach()
|
|
if err == nil {
|
|
m.rpcClient, err = rpc.NewClient(localRPCClient, m.config.UpstreamConfig)
|
|
}
|
|
if err != nil {
|
|
m.log.Error("Failed to create an RPC client", "error", err)
|
|
return RPCClientError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StopNode stop Status node. Stopped node cannot be resumed.
|
|
func (m *NodeManager) StopNode() error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.stopNode()
|
|
}
|
|
|
|
// stopNode stop Status node. Stopped node cannot be resumed.
|
|
func (m *NodeManager) stopNode() error {
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return err
|
|
}
|
|
if err := m.node.Stop(); err != nil {
|
|
return err
|
|
}
|
|
m.node = nil
|
|
m.config = nil
|
|
m.lesService = nil
|
|
m.whisperService = nil
|
|
m.rpcClient = nil
|
|
return nil
|
|
}
|
|
|
|
// ResetChainData removes chain data if node is not running.
|
|
func (m *NodeManager) ResetChainData(config *params.NodeConfig) error {
|
|
if m.IsNodeRunning() {
|
|
return ErrNodeExists
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata")
|
|
if _, err := os.Stat(chainDataDir); os.IsNotExist(err) {
|
|
// is it really an error, if we want to remove it as next step?
|
|
return err
|
|
}
|
|
err := os.RemoveAll(chainDataDir)
|
|
if err == nil {
|
|
m.log.Info("Chain data has been removed", "dir", chainDataDir)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// IsNodeRunning confirm that node is running
|
|
func (m *NodeManager) IsNodeRunning() bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Node returns underlying Status node
|
|
func (m *NodeManager) Node() (*node.Node, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
return m.node, nil
|
|
}
|
|
|
|
// PopulateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
|
|
func (m *NodeManager) PopulateStaticPeers() error {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.populateStaticPeers()
|
|
}
|
|
|
|
// populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
|
|
func (m *NodeManager) populateStaticPeers() error {
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return err
|
|
}
|
|
if !m.config.ClusterConfig.Enabled {
|
|
m.log.Info("Static peers are disabled")
|
|
return nil
|
|
}
|
|
|
|
for _, enode := range m.config.ClusterConfig.StaticNodes {
|
|
err := m.addPeer(enode)
|
|
if err != nil {
|
|
m.log.Warn("Static peer addition failed", "error", err)
|
|
continue
|
|
}
|
|
m.log.Info("Static peer added", "enode", enode)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *NodeManager) removeStaticPeers() error {
|
|
if !m.config.ClusterConfig.Enabled {
|
|
m.log.Info("Static peers are disabled")
|
|
return nil
|
|
}
|
|
server := m.node.Server()
|
|
if server == nil {
|
|
return ErrNoRunningNode
|
|
}
|
|
for _, enode := range m.config.ClusterConfig.StaticNodes {
|
|
err := m.removePeer(enode)
|
|
if err != nil {
|
|
m.log.Warn("Static peer deletion failed", "error", err)
|
|
return err
|
|
}
|
|
m.log.Info("Static peer deleted", "enode", enode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReconnectStaticPeers removes and adds static peers to a server.
|
|
func (m *NodeManager) ReconnectStaticPeers() error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if err := m.removeStaticPeers(); err != nil {
|
|
return err
|
|
}
|
|
return m.populateStaticPeers()
|
|
}
|
|
|
|
// AddPeer adds new static peer node
|
|
func (m *NodeManager) AddPeer(url string) error {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return err
|
|
}
|
|
return m.addPeer(url)
|
|
}
|
|
|
|
// addPeer adds new static peer node
|
|
func (m *NodeManager) addPeer(url string) error {
|
|
// Try to add the url as a static peer and return
|
|
parsedNode, err := discover.ParseNode(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.node.Server().AddPeer(parsedNode)
|
|
return nil
|
|
}
|
|
|
|
func (m *NodeManager) removePeer(url string) error {
|
|
parsedNode, err := discover.ParseNode(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.node.Server().RemovePeer(parsedNode)
|
|
return nil
|
|
}
|
|
|
|
// PeerCount returns the number of connected peers.
|
|
func (m *NodeManager) PeerCount() int {
|
|
if !m.IsNodeRunning() {
|
|
return 0
|
|
}
|
|
return m.node.Server().PeerCount()
|
|
}
|
|
|
|
// NodeConfig exposes reference to running node's configuration
|
|
func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
return m.config, nil
|
|
}
|
|
|
|
// LightEthereumService exposes reference to LES service running on top of the node
|
|
func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
if m.lesService == nil {
|
|
if err := m.node.Service(&m.lesService); err != nil {
|
|
m.log.Warn("Cannot obtain LES service", "error", err)
|
|
return nil, ErrInvalidLightEthereumService
|
|
}
|
|
}
|
|
if m.lesService == nil {
|
|
return nil, ErrInvalidLightEthereumService
|
|
}
|
|
return m.lesService, nil
|
|
}
|
|
|
|
// WhisperService exposes reference to Whisper service running on top of the node
|
|
func (m *NodeManager) WhisperService() (*whisper.Whisper, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
if m.whisperService == nil {
|
|
if err := m.node.Service(&m.whisperService); err != nil {
|
|
m.log.Warn("Cannot obtain whisper service", "error", err)
|
|
return nil, ErrInvalidWhisperService
|
|
}
|
|
}
|
|
if m.whisperService == nil {
|
|
return nil, ErrInvalidWhisperService
|
|
}
|
|
return m.whisperService, nil
|
|
}
|
|
|
|
// AccountManager exposes reference to node's accounts manager
|
|
func (m *NodeManager) AccountManager() (*accounts.Manager, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
accountManager := m.node.AccountManager()
|
|
if accountManager == nil {
|
|
return nil, ErrInvalidAccountManager
|
|
}
|
|
return accountManager, nil
|
|
}
|
|
|
|
// AccountKeyStore exposes reference to accounts key store
|
|
func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if err := m.isNodeAvailable(); err != nil {
|
|
return nil, err
|
|
}
|
|
accountManager := m.node.AccountManager()
|
|
if accountManager == nil {
|
|
return nil, ErrInvalidAccountManager
|
|
}
|
|
|
|
backends := accountManager.Backends(keystore.KeyStoreType)
|
|
if len(backends) == 0 {
|
|
return nil, ErrAccountKeyStoreMissing
|
|
}
|
|
|
|
keyStore, ok := backends[0].(*keystore.KeyStore)
|
|
if !ok {
|
|
return nil, ErrAccountKeyStoreMissing
|
|
}
|
|
|
|
return keyStore, nil
|
|
}
|
|
|
|
// RPCClient exposes reference to RPC client connected to the running node.
|
|
func (m *NodeManager) RPCClient() *rpc.Client {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.rpcClient
|
|
}
|
|
|
|
// isNodeAvailable check if we have a node running and make sure is fully started
|
|
func (m *NodeManager) isNodeAvailable() error {
|
|
if m.node == nil || m.node.Server() == nil {
|
|
return ErrNoRunningNode
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tickerResolution is the delta to check blockchain sync progress.
|
|
const tickerResolution = time.Second
|
|
|
|
// EnsureSync waits until blockchain synchronization
|
|
// is complete and returns.
|
|
func (m *NodeManager) EnsureSync(ctx context.Context) error {
|
|
// Don't wait for any blockchain sync for the
|
|
// local private chain as blocks are never mined.
|
|
if m.config.NetworkID == params.StatusChainNetworkID {
|
|
return nil
|
|
}
|
|
|
|
return m.ensureSync(ctx)
|
|
}
|
|
|
|
func (m *NodeManager) ensureSync(ctx context.Context) error {
|
|
les, err := m.LightEthereumService()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get LES service: %v", err)
|
|
}
|
|
|
|
downloader := les.Downloader()
|
|
if downloader == nil {
|
|
return errors.New("LightEthereumService downloader is nil")
|
|
}
|
|
|
|
progress := downloader.Progress()
|
|
if m.PeerCount() > 0 && progress.CurrentBlock >= progress.HighestBlock {
|
|
m.log.Debug("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
|
|
return nil
|
|
}
|
|
|
|
ticker := time.NewTicker(tickerResolution)
|
|
defer ticker.Stop()
|
|
|
|
progressTicker := time.NewTicker(time.Minute)
|
|
defer progressTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("timeout during node synchronization")
|
|
case <-ticker.C:
|
|
if m.PeerCount() == 0 {
|
|
m.log.Debug("No established connections with any peers, continue waiting for a sync")
|
|
continue
|
|
}
|
|
if downloader.Synchronising() {
|
|
m.log.Debug("Synchronization is in progress")
|
|
continue
|
|
}
|
|
progress = downloader.Progress()
|
|
if progress.CurrentBlock >= progress.HighestBlock {
|
|
m.log.Info("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock)
|
|
return nil
|
|
}
|
|
m.log.Debug("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
|
|
case <-progressTicker.C:
|
|
progress = downloader.Progress()
|
|
m.log.Warn("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock)
|
|
}
|
|
}
|
|
}
|