status-go/node/status_node_services.go

534 lines
15 KiB
Go

package node
import (
"encoding/json"
"errors"
"fmt"
"reflect"
logging "github.com/ipfs/go-log"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p/enode"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/appmetrics"
"github.com/status-im/status-go/common"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc"
accountssvc "github.com/status-im/status-go/services/accounts"
appmetricsservice "github.com/status-im/status-go/services/appmetrics"
"github.com/status-im/status-go/services/browsers"
"github.com/status-im/status-go/services/ext"
localnotifications "github.com/status-im/status-go/services/local-notifications"
"github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/peer"
"github.com/status-im/status-go/services/permissions"
"github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/rpcfilters"
"github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/services/subscriptions"
"github.com/status-im/status-go/services/wakuext"
"github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/wakuv2"
)
var (
// ErrWakuClearIdentitiesFailure clearing whisper identities has failed.
ErrWakuClearIdentitiesFailure = errors.New("failed to clear waku identities")
// ErrRPCClientUnavailable is returned if an RPC client can't be retrieved.
// This is a normal situation when a node is stopped.
ErrRPCClientUnavailable = errors.New("JSON-RPC client is unavailable")
)
func (b *StatusNode) initServices(config *params.NodeConfig) error {
accountsFeed := &event.Feed{}
services := []common.StatusService{}
services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService())
services = append(services, b.subscriptionService())
services = append(services, b.rpcStatsService())
services = append(services, b.appmetricsService())
services = append(services, b.peerService())
services = append(services, b.personalService())
services = append(services, b.statusPublicService())
services = appendIf(config.EnableNTPSync, services, b.timeSource())
services = appendIf(b.appDB != nil && b.multiaccountsDB != nil, services, b.accountsService(accountsFeed))
services = appendIf(config.BrowsersConfig.Enabled, services, b.browsersService())
services = appendIf(config.PermissionsConfig.Enabled, services, b.permissionsService())
services = appendIf(config.MailserversConfig.Enabled, services, b.mailserversService())
if config.WakuConfig.Enabled {
wakuService, err := b.wakuService(&config.WakuConfig, &config.ClusterConfig)
if err != nil {
return err
}
services = append(services, wakuService)
wakuext, err := b.wakuExtService(config)
if err != nil {
return err
}
b.wakuExtSrvc = wakuext
services = append(services, wakuext)
}
if config.WakuV2Config.Enabled {
waku2Service, err := b.wakuV2Service(config.NodeKey, &config.WakuV2Config, &config.ClusterConfig)
if err != nil {
return err
}
services = append(services, waku2Service)
wakuext, err := b.wakuV2ExtService(config)
if err != nil {
return err
}
b.wakuV2ExtSrvc = wakuext
services = append(services, wakuext)
}
if config.WalletConfig.Enabled {
walletService := b.walletService(accountsFeed)
services = append(services, walletService)
}
// We ignore for now local notifications flag as users who are upgrading have no mean to enable it
services = append(services, b.localNotificationsService(config.NetworkID))
b.peerSrvc.SetDiscoverer(b)
for i := range services {
b.RegisterLifecycle(services[i])
}
b.services = services
return nil
}
func (b *StatusNode) RegisterLifecycle(s common.StatusService) {
b.addPublicMethods(s.APIs())
b.gethNode.RegisterAPIs(s.APIs())
b.gethNode.RegisterProtocols(s.Protocols())
b.gethNode.RegisterLifecycle(s)
}
// Add through reflection a list of public methods so we can check when the
// user makes a call if they are allowed
func (b *StatusNode) addPublicMethods(apis []gethrpc.API) {
for _, api := range apis {
if api.Public {
addSuitableCallbacks(reflect.ValueOf(api.Service), api.Namespace, b.publicMethods)
}
}
}
func (b *StatusNode) nodeBridge() types.Node {
return gethbridge.NewNodeBridge(b.gethNode, b.wakuSrvc, b.wakuV2Srvc)
}
func (b *StatusNode) wakuExtService(config *params.NodeConfig) (*wakuext.Service, error) {
if b.gethNode == nil {
return nil, errors.New("geth node not initialized")
}
if b.wakuExtSrvc == nil {
b.wakuExtSrvc = wakuext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
}
b.wakuExtSrvc.SetP2PServer(b.gethNode.Server())
return b.wakuExtSrvc, nil
}
func (b *StatusNode) wakuV2ExtService(config *params.NodeConfig) (*wakuv2ext.Service, error) {
if b.gethNode == nil {
return nil, errors.New("geth node not initialized")
}
if b.wakuV2ExtSrvc == nil {
b.wakuV2ExtSrvc = wakuv2ext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
}
b.wakuV2ExtSrvc.SetP2PServer(b.gethNode.Server())
return b.wakuV2ExtSrvc, nil
}
func (b *StatusNode) statusPublicService() *status.Service {
if b.statusPublicSrvc == nil {
b.statusPublicSrvc = status.New()
}
return b.statusPublicSrvc
}
func (b *StatusNode) StatusPublicService() *status.Service {
return b.statusPublicSrvc
}
func (b *StatusNode) WakuService() *waku.Waku {
return b.wakuSrvc
}
func (b *StatusNode) WakuExtService() *wakuext.Service {
return b.wakuExtSrvc
}
func (b *StatusNode) WakuV2ExtService() *wakuv2ext.Service {
return b.wakuV2ExtSrvc
}
func (b *StatusNode) WakuV2Service() *wakuv2.Waku {
return b.wakuV2Srvc
}
func (b *StatusNode) wakuService(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfig) (*waku.Waku, error) {
if b.wakuSrvc == nil {
cfg := &waku.Config{
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
BloomFilterMode: wakuCfg.BloomFilterMode,
FullNode: wakuCfg.FullNode,
SoftBlacklistedPeerIDs: wakuCfg.SoftBlacklistedPeerIDs,
MinimumAcceptedPoW: params.WakuMinimumPoW,
EnableConfirmations: wakuCfg.EnableConfirmations,
}
if wakuCfg.MaxMessageSize > 0 {
cfg.MaxMessageSize = wakuCfg.MaxMessageSize
}
if wakuCfg.MinimumPoW > 0 {
cfg.MinimumAcceptedPoW = wakuCfg.MinimumPoW
}
w := waku.New(cfg, logutils.ZapLogger())
if wakuCfg.EnableRateLimiter {
r := wakuRateLimiter(wakuCfg, clusterCfg)
w.RegisterRateLimiter(r)
}
if timesource := b.timeSource(); timesource != nil {
w.SetTimeSource(timesource.Now)
}
// enable mail service
if wakuCfg.EnableMailServer {
if err := registerWakuMailServer(w, wakuCfg); err != nil {
return nil, fmt.Errorf("failed to register WakuMailServer: %v", err)
}
}
if wakuCfg.LightClient {
emptyBloomFilter := make([]byte, 64)
if err := w.SetBloomFilter(emptyBloomFilter); err != nil {
return nil, err
}
}
b.wakuSrvc = w
}
return b.wakuSrvc, nil
}
func (b *StatusNode) wakuV2Service(nodeKey string, wakuCfg *params.WakuV2Config, clusterCfg *params.ClusterConfig) (*wakuv2.Waku, error) {
if b.wakuV2Srvc == nil {
cfg := &wakuv2.Config{
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
SoftBlacklistedPeerIDs: wakuCfg.SoftBlacklistedPeerIDs,
Host: wakuCfg.Host,
Port: wakuCfg.Port,
LightClient: wakuCfg.LightClient,
KeepAliveInterval: wakuCfg.KeepAliveInterval,
RelayNodes: clusterCfg.RelayNodes,
StoreNodes: clusterCfg.StoreNodes,
FilterNodes: clusterCfg.FilterNodes,
LightpushNodes: clusterCfg.LightpushNodes,
PeerExchange: clusterCfg.PeerExchange,
}
if cfg.Host == "" {
cfg.Host = wakuv2.DefaultConfig.Host
}
if wakuCfg.MaxMessageSize > 0 {
cfg.MaxMessageSize = wakuCfg.MaxMessageSize
}
lvl, err := logging.LevelFromString("info")
if err != nil {
panic(err)
}
logging.SetAllLoggers(lvl)
w, err := wakuv2.New(nodeKey, cfg, logutils.ZapLogger())
if err != nil {
return nil, err
}
b.wakuV2Srvc = w
}
return b.wakuV2Srvc, nil
}
func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfig) *wakucommon.PeerRateLimiter {
enodes := append(
parseNodes(clusterCfg.StaticNodes),
parseNodes(clusterCfg.TrustedMailServers)...,
)
var (
ips []string
peerIDs []enode.ID
)
for _, item := range enodes {
ips = append(ips, item.IP().String())
peerIDs = append(peerIDs, item.ID())
}
return wakucommon.NewPeerRateLimiter(
&wakucommon.PeerRateLimiterConfig{
PacketLimitPerSecIP: wakuCfg.PacketRateLimitIP,
PacketLimitPerSecPeerID: wakuCfg.PacketRateLimitPeerID,
BytesLimitPerSecIP: wakuCfg.BytesRateLimitIP,
BytesLimitPerSecPeerID: wakuCfg.BytesRateLimitPeerID,
WhitelistedIPs: ips,
WhitelistedPeerIDs: peerIDs,
},
&wakucommon.MetricsRateLimiterHandler{},
&wakucommon.DropPeerRateLimiterHandler{
Tolerance: wakuCfg.RateLimitTolerance,
},
)
}
func (b *StatusNode) rpcFiltersService() *rpcfilters.Service {
if b.rpcFiltersSrvc == nil {
b.rpcFiltersSrvc = rpcfilters.New(b)
}
return b.rpcFiltersSrvc
}
func (b *StatusNode) subscriptionService() *subscriptions.Service {
if b.subscriptionsSrvc == nil {
b.subscriptionsSrvc = subscriptions.New(func() *rpc.Client { return b.RPCClient() })
}
return b.subscriptionsSrvc
}
func (b *StatusNode) rpcStatsService() *rpcstats.Service {
if b.rpcStatsSrvc == nil {
b.rpcStatsSrvc = rpcstats.New()
}
return b.rpcStatsSrvc
}
func (b *StatusNode) accountsService(accountsFeed *event.Feed) *accountssvc.Service {
if b.accountsSrvc == nil {
b.accountsSrvc = accountssvc.NewService(accounts.NewDB(b.appDB), b.multiaccountsDB, b.gethAccountManager.Manager, accountsFeed)
}
return b.accountsSrvc
}
func (b *StatusNode) browsersService() *browsers.Service {
if b.browsersSrvc == nil {
b.browsersSrvc = browsers.NewService(browsers.NewDB(b.appDB))
}
return b.browsersSrvc
}
func (b *StatusNode) permissionsService() *permissions.Service {
if b.permissionsSrvc == nil {
b.permissionsSrvc = permissions.NewService(permissions.NewDB(b.appDB))
}
return b.permissionsSrvc
}
func (b *StatusNode) mailserversService() *mailservers.Service {
if b.mailserversSrvc == nil {
b.mailserversSrvc = mailservers.NewService(mailservers.NewDB(b.appDB))
}
return b.mailserversSrvc
}
func (b *StatusNode) appmetricsService() common.StatusService {
if b.appMetricsSrvc == nil {
b.appMetricsSrvc = appmetricsservice.NewService(appmetrics.NewDB(b.appDB))
}
return b.appMetricsSrvc
}
func (b *StatusNode) walletService(accountsFeed *event.Feed) common.StatusService {
if b.walletSrvc == nil {
b.walletSrvc = wallet.NewService(b.appDB, b.rpcClient, accountsFeed)
}
return b.walletSrvc
}
func (b *StatusNode) localNotificationsService(network uint64) *localnotifications.Service {
if b.localNotificationsSrvc == nil {
b.localNotificationsSrvc = localnotifications.NewService(b.appDB, network)
}
return b.localNotificationsSrvc
}
func (b *StatusNode) peerService() *peer.Service {
if b.peerSrvc == nil {
b.peerSrvc = peer.New()
}
return b.peerSrvc
}
func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) (err error) {
var mailServer mailserver.WakuMailServer
wakuService.RegisterMailServer(&mailServer)
return mailServer.Init(wakuService, config)
}
func appendIf(condition bool, services []common.StatusService, service common.StatusService) []common.StatusService {
if !condition {
return services
}
return append(services, service)
}
func (b *StatusNode) RPCFiltersService() *rpcfilters.Service {
return b.rpcFiltersSrvc
}
func (b *StatusNode) StopLocalNotifications() error {
if b.localNotificationsSrvc == nil {
return nil
}
if b.localNotificationsSrvc.IsStarted() {
err := b.localNotificationsSrvc.Stop()
if err != nil {
b.log.Error("LocalNotifications service stop failed on StopLocalNotifications", "error", err)
return nil
}
}
return nil
}
func (b *StatusNode) StartLocalNotifications() error {
if b.localNotificationsSrvc == nil {
return nil
}
if b.walletSrvc == nil {
return nil
}
if !b.localNotificationsSrvc.IsStarted() {
err := b.localNotificationsSrvc.Start()
if err != nil {
b.log.Error("LocalNotifications service start failed on StartLocalNotifications", "error", err)
return nil
}
}
err := b.localNotificationsSrvc.SubscribeWallet(b.walletSrvc.GetFeed())
if err != nil {
b.log.Error("LocalNotifications service could not subscribe to wallet on StartLocalNotifications", "error", err)
return nil
}
return nil
}
// `personal_sign` and `personal_ecRecover` methods are important to
// keep DApps working.
// Usually, they are provided by an ETH or a LES service, but when using
// upstream, we don't start any of these, so we need to start our own
// implementation.
func (b *StatusNode) personalService() *personal.Service {
if b.personalSrvc == nil {
b.personalSrvc = personal.New(b.accountsManager)
}
return b.personalSrvc
}
func (b *StatusNode) timeSource() *timesource.NTPTimeSource {
if b.timeSourceSrvc == nil {
b.timeSourceSrvc = timesource.Default()
}
return b.timeSourceSrvc
}
func (b *StatusNode) Cleanup() error {
if b.wakuSrvc != nil {
if err := b.wakuSrvc.DeleteKeyPairs(); err != nil {
return fmt.Errorf("%s: %v", ErrWakuClearIdentitiesFailure, err)
}
}
if b.Config() != nil && b.Config().WalletConfig.Enabled {
if b.walletSrvc != nil {
if b.walletSrvc.IsStarted() {
err := b.walletSrvc.Stop()
if err != nil {
return err
}
}
}
}
return nil
}
type RPCCall struct {
Method string `json:"method"`
}
func (b *StatusNode) CallPrivateRPC(inputJSON string) (string, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.rpcClient == nil {
return "", ErrRPCClientUnavailable
}
return b.rpcClient.CallRaw(inputJSON), nil
}
// CallRPC calls public methods on the node, we register public methods
// in a map and check if they can be called in this function
func (b *StatusNode) CallRPC(inputJSON string) (string, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.rpcClient == nil {
return "", ErrRPCClientUnavailable
}
rpcCall := &RPCCall{}
err := json.Unmarshal([]byte(inputJSON), rpcCall)
if err != nil {
return "", err
}
if rpcCall.Method == "" || !b.publicMethods[rpcCall.Method] {
return ErrRPCMethodUnavailable, nil
}
return b.rpcClient.CallRaw(inputJSON), nil
}