package node import ( "context" "errors" "fmt" "net" "os" "path/filepath" "reflect" "sync" "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" ma "github.com/multiformats/go-multiaddr" "github.com/syndtr/goleveldb/leveldb" "github.com/status-im/status-go/db" "github.com/status-im/status-go/discovery" "github.com/status-im/status-go/params" "github.com/status-im/status-go/peers" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/peer" "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/status" ) // tickerResolution is the delta to check blockchain sync progress. const tickerResolution = time.Second // errors var ( ErrNodeRunning = errors.New("node is already running") ErrNoGethNode = errors.New("geth node is not available") ErrNoRunningNode = errors.New("there is no running node") ErrAccountKeyStoreMissing = errors.New("account key store is not set") ErrServiceUnknown = errors.New("service unknown") ) // StatusNode abstracts contained geth node and provides helper methods to // interact with it. type StatusNode struct { mu sync.RWMutex config *params.NodeConfig // Status node configuration gethNode *node.Node // reference to Geth P2P stack/node rpcClient *rpc.Client // reference to public RPC client rpcPrivateClient *rpc.Client // reference to private RPC client (can call private APIs) discovery discovery.Discovery register *peers.Register peerPool *peers.PeerPool db *leveldb.DB // used as a cache for PeerPool log log.Logger } // New makes new instance of StatusNode. func New() *StatusNode { return &StatusNode{ log: log.New("package", "status-go/node.StatusNode"), } } // Config exposes reference to running node's configuration func (n *StatusNode) Config() *params.NodeConfig { n.mu.RLock() defer n.mu.RUnlock() return n.config } // GethNode returns underlying geth node. func (n *StatusNode) GethNode() *node.Node { n.mu.RLock() defer n.mu.RUnlock() return n.gethNode } // Server retrieves the currently running P2P network layer. func (n *StatusNode) Server() *p2p.Server { n.mu.RLock() defer n.mu.RUnlock() if n.gethNode == nil { return nil } return n.gethNode.Server() } func (n *StatusNode) startWithDB(config *params.NodeConfig, db *leveldb.DB, services []node.ServiceConstructor) error { if err := n.createNode(config, db); err != nil { return err } n.config = config if err := n.start(services); err != nil { return err } if err := n.setupRPCClient(); err != nil { return err } if n.discoveryEnabled() { return n.startDiscovery() } return nil } // Start starts current StatusNode, will fail if it's already started. func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceConstructor) error { n.mu.Lock() defer n.mu.Unlock() n.log.Debug("starting with NodeConfig", "ClusterConfig", config.ClusterConfig) if n.isRunning() { return ErrNodeRunning } db, err := db.Create(config.DataDir, params.StatusDatabase) if err != nil { return err } n.db = db err = n.startWithDB(config, db, services) if err != nil { if dberr := db.Close(); dberr != nil { n.log.Error("error while closing leveldb after node crash", "error", dberr) } n.db = nil return err } return nil } func (n *StatusNode) createNode(config *params.NodeConfig, db *leveldb.DB) (err error) { n.gethNode, err = MakeNode(config, db) return err } // start starts current StatusNode, will fail if it's already started. func (n *StatusNode) start(services []node.ServiceConstructor) error { for _, service := range services { if err := n.gethNode.Register(service); err != nil { return err } } return n.gethNode.Start() } func (n *StatusNode) setupRPCClient() (err error) { // setup public RPC client gethNodeClient, err := n.gethNode.AttachPublic() if err != nil { return } n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.UpstreamConfig) if err != nil { return } // setup private RPC client gethNodePrivateClient, err := n.gethNode.Attach() if err != nil { return } n.rpcPrivateClient, err = rpc.NewClient(gethNodePrivateClient, n.config.UpstreamConfig) return } func (n *StatusNode) discoveryEnabled() bool { return n.config != nil && (!n.config.NoDiscovery || n.config.Rendezvous) && n.config.ClusterConfig != nil } func (n *StatusNode) discoverNode() *discover.Node { if !n.isRunning() { return nil } discNode := n.gethNode.Server().Self() if n.config.AdvertiseAddr != "" { n.log.Info("using AdvertiseAddr for rendezvous", "addr", n.config.AdvertiseAddr) discNode.IP = net.ParseIP(n.config.AdvertiseAddr) } return discNode } func (n *StatusNode) startRendezvous() (discovery.Discovery, error) { if !n.config.Rendezvous { return nil, errors.New("rendezvous is not enabled") } if len(n.config.ClusterConfig.RendezvousNodes) == 0 { return nil, errors.New("rendezvous node must be provided if rendezvous discovery is enabled") } maddrs := make([]ma.Multiaddr, len(n.config.ClusterConfig.RendezvousNodes)) for i, addr := range n.config.ClusterConfig.RendezvousNodes { var err error maddrs[i], err = ma.NewMultiaddr(addr) if err != nil { return nil, fmt.Errorf("failed to parse rendezvous node %s: %v", n.config.ClusterConfig.RendezvousNodes[0], err) } } return discovery.NewRendezvous(maddrs, n.gethNode.Server().PrivateKey, n.discoverNode()) } func (n *StatusNode) startDiscovery() error { discoveries := []discovery.Discovery{} if !n.config.NoDiscovery { discoveries = append(discoveries, discovery.NewDiscV5( n.gethNode.Server().PrivateKey, n.config.ListenAddr, parseNodesV5(n.config.ClusterConfig.BootNodes))) } if n.config.Rendezvous { d, err := n.startRendezvous() if err != nil { return err } discoveries = append(discoveries, d) } if len(discoveries) == 0 { return errors.New("wasn't able to register any discovery") } else if len(discoveries) > 1 { n.discovery = discovery.NewMultiplexer(discoveries) } else { n.discovery = discoveries[0] } log.Debug( "using discovery", "instance", reflect.TypeOf(n.discovery), "registerTopics", n.config.RegisterTopics, "requireTopics", n.config.RequireTopics, ) n.register = peers.NewRegister(n.discovery, n.config.RegisterTopics...) options := peers.NewDefaultOptions() // TODO(dshulyak) consider adding a flag to define this behaviour options.AllowStop = len(n.config.RegisterTopics) == 0 options.TrustedMailServers = parseNodesToNodeID(n.config.ClusterConfig.TrustedMailServers) options.MailServerRegistryAddress = n.config.MailServerRegistryAddress n.peerPool = peers.NewPeerPool( n.discovery, n.config.RequireTopics, peers.NewCache(n.db), options, ) if err := n.discovery.Start(); err != nil { return err } if err := n.register.Start(); err != nil { return err } return n.peerPool.Start(n.gethNode.Server(), n.rpcClient) } // Stop will stop current StatusNode. A stopped node cannot be resumed. func (n *StatusNode) Stop() error { n.mu.Lock() defer n.mu.Unlock() if !n.isRunning() { return ErrNoRunningNode } return n.stop() } // stop will stop current StatusNode. A stopped node cannot be resumed. func (n *StatusNode) stop() error { if n.discoveryEnabled() { if err := n.stopDiscovery(); err != nil { n.log.Error("Error stopping the PeerPool", "error", err) } n.register = nil n.peerPool = nil } if err := n.gethNode.Stop(); err != nil { return err } n.rpcClient = nil n.rpcPrivateClient = nil // We need to clear `gethNode` because config is passed to `Start()` // and may be completely different. Similarly with `config`. n.gethNode = nil n.config = nil if n.db != nil { err := n.db.Close() n.db = nil return err } return nil } func (n *StatusNode) stopDiscovery() error { n.register.Stop() n.peerPool.Stop() return n.discovery.Stop() } // ResetChainData removes chain data if node is not running. func (n *StatusNode) ResetChainData(config *params.NodeConfig) error { n.mu.Lock() defer n.mu.Unlock() if n.isRunning() { return ErrNodeRunning } chainDataDir := filepath.Join(config.DataDir, config.Name, "lightchaindata") if _, err := os.Stat(chainDataDir); os.IsNotExist(err) { return err } err := os.RemoveAll(chainDataDir) if err == nil { n.log.Info("Chain data has been removed", "dir", chainDataDir) } return err } // IsRunning confirm that node is running. func (n *StatusNode) IsRunning() bool { n.mu.RLock() defer n.mu.RUnlock() return n.isRunning() } func (n *StatusNode) isRunning() bool { return n.gethNode != nil && n.gethNode.Server() != nil } // populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster func (n *StatusNode) populateStaticPeers() error { if n.config.ClusterConfig == nil || !n.config.ClusterConfig.Enabled { n.log.Info("Static peers are disabled") return nil } for _, enode := range n.config.ClusterConfig.StaticNodes { if err := n.addPeer(enode); err != nil { n.log.Error("Static peer addition failed", "error", err) return err } n.log.Info("Static peer added", "enode", enode) } return nil } func (n *StatusNode) removeStaticPeers() error { if n.config.ClusterConfig == nil || !n.config.ClusterConfig.Enabled { n.log.Info("Static peers are disabled") return nil } for _, enode := range n.config.ClusterConfig.StaticNodes { if err := n.removePeer(enode); err != nil { n.log.Error("Static peer deletion failed", "error", err) return err } n.log.Info("Static peer deleted", "enode", enode) } return nil } // ReconnectStaticPeers removes and adds static peers to a server. func (n *StatusNode) ReconnectStaticPeers() error { n.mu.Lock() defer n.mu.Unlock() if !n.isRunning() { return ErrNoRunningNode } if err := n.removeStaticPeers(); err != nil { return err } return n.populateStaticPeers() } // AddPeer adds new static peer node func (n *StatusNode) AddPeer(url string) error { n.mu.RLock() defer n.mu.RUnlock() return n.addPeer(url) } // addPeer adds new static peer node func (n *StatusNode) addPeer(url string) error { parsedNode, err := discover.ParseNode(url) if err != nil { return err } if !n.isRunning() { return ErrNoRunningNode } n.gethNode.Server().AddPeer(parsedNode) return nil } func (n *StatusNode) removePeer(url string) error { parsedNode, err := discover.ParseNode(url) if err != nil { return err } if !n.isRunning() { return ErrNoRunningNode } n.gethNode.Server().RemovePeer(parsedNode) return nil } // PeerCount returns the number of connected peers. func (n *StatusNode) PeerCount() int { n.mu.RLock() defer n.mu.RUnlock() if !n.isRunning() { return 0 } return n.gethNode.Server().PeerCount() } // gethService is a wrapper for gethNode.Service which retrieves a currently // running service registered of a specific type. func (n *StatusNode) gethService(serviceInstance interface{}) error { if !n.isRunning() { return ErrNoRunningNode } if err := n.gethNode.Service(serviceInstance); err != nil { return err } return nil } // LightEthereumService exposes reference to LES service running on top of the node func (n *StatusNode) LightEthereumService() (l *les.LightEthereum, err error) { n.mu.RLock() defer n.mu.RUnlock() err = n.gethService(&l) if err == node.ErrServiceUnknown { err = ErrServiceUnknown } return } // StatusService exposes reference to status service running on top of the node func (n *StatusNode) StatusService() (st *status.Service, err error) { n.mu.RLock() defer n.mu.RUnlock() err = n.gethService(&st) if err == node.ErrServiceUnknown { err = ErrServiceUnknown } return } // PeerService exposes reference to peer service running on top of the node. func (n *StatusNode) PeerService() (st *peer.Service, err error) { n.mu.RLock() defer n.mu.RUnlock() err = n.gethService(&st) if err == node.ErrServiceUnknown { err = ErrServiceUnknown } return } // WhisperService exposes reference to Whisper service running on top of the node func (n *StatusNode) WhisperService() (w *whisper.Whisper, err error) { n.mu.RLock() defer n.mu.RUnlock() err = n.gethService(&w) if err == node.ErrServiceUnknown { err = ErrServiceUnknown } return } // ShhExtService exposes reference to shh extension service running on top of the node func (n *StatusNode) ShhExtService() (s *shhext.Service, err error) { n.mu.RLock() defer n.mu.RUnlock() err = n.gethService(&s) if err == node.ErrServiceUnknown { err = ErrServiceUnknown } return } // AccountManager exposes reference to node's accounts manager func (n *StatusNode) AccountManager() (*accounts.Manager, error) { n.mu.RLock() defer n.mu.RUnlock() if n.gethNode == nil { return nil, ErrNoGethNode } return n.gethNode.AccountManager(), nil } // AccountKeyStore exposes reference to accounts key store func (n *StatusNode) AccountKeyStore() (*keystore.KeyStore, error) { n.mu.RLock() defer n.mu.RUnlock() if n.gethNode == nil { return nil, ErrNoGethNode } accountManager := n.gethNode.AccountManager() backends := accountManager.Backends(keystore.KeyStoreType) if len(backends) == 0 { return nil, ErrAccountKeyStoreMissing } keyStore, ok := backends[0].(*keystore.KeyStore) if !ok { return nil, ErrAccountKeyStoreMissing } return keyStore, nil } // RPCClient exposes reference to RPC client connected to the running node. func (n *StatusNode) RPCClient() *rpc.Client { n.mu.RLock() defer n.mu.RUnlock() return n.rpcClient } // RPCPrivateClient exposes reference to RPC client connected to the running node // that can call both public and private APIs. func (n *StatusNode) RPCPrivateClient() *rpc.Client { n.mu.Lock() defer n.mu.Unlock() return n.rpcPrivateClient } // EnsureSync waits until blockchain synchronization // is complete and returns. func (n *StatusNode) EnsureSync(ctx context.Context) error { // Don't wait for any blockchain sync for the // local private chain as blocks are never mined. if n.config.NetworkID == 0 || n.config.NetworkID == params.StatusChainNetworkID { return nil } return n.ensureSync(ctx) } func (n *StatusNode) ensureSync(ctx context.Context) error { les, err := n.LightEthereumService() if err != nil { return fmt.Errorf("failed to get LES service: %v", err) } downloader := les.Downloader() if downloader == nil { return errors.New("LightEthereumService downloader is nil") } progress := downloader.Progress() if n.PeerCount() > 0 && progress.CurrentBlock >= progress.HighestBlock { n.log.Debug("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock) return nil } ticker := time.NewTicker(tickerResolution) defer ticker.Stop() progressTicker := time.NewTicker(time.Minute) defer progressTicker.Stop() for { select { case <-ctx.Done(): return errors.New("timeout during node synchronization") case <-ticker.C: if n.PeerCount() == 0 { n.log.Debug("No established connections with any peers, continue waiting for a sync") continue } if downloader.Synchronising() { n.log.Debug("Synchronization is in progress") continue } progress = downloader.Progress() if progress.CurrentBlock >= progress.HighestBlock { n.log.Info("Synchronization completed", "current block", progress.CurrentBlock, "highest block", progress.HighestBlock) return nil } n.log.Debug("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock) case <-progressTicker.C: progress = downloader.Progress() n.log.Warn("Synchronization is not finished", "current", progress.CurrentBlock, "highest", progress.HighestBlock) } } } // Discover sets up the discovery for a specific topic. func (n *StatusNode) Discover(topic string, max, min int) (err error) { if n.peerPool == nil { return errors.New("peerPool not running") } return n.peerPool.UpdateTopic(topic, params.Limits{ Max: max, Min: min, }) }