diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 9c93b3f0..eeedb3a8 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -8,6 +8,8 @@ import ( "github.com/waku-org/go-waku/cmd/waku/keygen" "github.com/waku-org/go-waku/cmd/waku/rlngenerate" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" ) var options NodeOptions @@ -113,7 +115,16 @@ func main() { Before: altsrc.InitInputSourceWithContext(cliFlags, altsrc.NewTomlSourceFromFlagFunc("config-file")), Flags: cliFlags, Action: func(c *cli.Context) error { - Execute(options) + err := Execute(options) + if err != nil { + utils.Logger().Error("failure while executing wakunode", zap.Error(err)) + switch e := err.(type) { + case cli.ExitCoder: + return e + case error: + return cli.Exit(err.Error(), 1) + } + } return nil }, Commands: []*cli.Command{ diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 7edef337..fab591f7 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -5,7 +5,6 @@ import ( "crypto/ecdsa" "database/sql" "encoding/json" - "errors" "fmt" "net" "os" @@ -17,6 +16,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/pbnjay/memory" "github.com/prometheus/client_golang/prometheus" + "github.com/urfave/cli/v2" dbutils "github.com/waku-org/go-waku/waku/persistence/utils" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" @@ -56,12 +56,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) -func failOnErr(err error, msg string) { - if err != nil { - utils.Logger().Fatal(msg, zap.Error(err)) - } -} - func requiresDB(options NodeOptions) bool { return options.Store.Enable || options.Rendezvous.Enable } @@ -80,21 +74,37 @@ func scalePerc(value float64) float64 { const dialTimeout = 7 * time.Second +func nonRecoverErrorMsg(format string, a ...any) error { + err := fmt.Errorf(format, a...) + return nonRecoverError(err) +} + +func nonRecoverError(err error) error { + return cli.Exit(err.Error(), 166) +} + // Execute starts a go-waku node with settings determined by the Options parameter -func Execute(options NodeOptions) { +func Execute(options NodeOptions) error { // Set encoding for logs (console, json, ...) // Note that libp2p reads the encoding from GOLOG_LOG_FMT env var. utils.InitLogger(options.LogEncoding, options.LogOutput) hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.Address, options.Port)) - failOnErr(err, "invalid host address") + if err != nil { + return nonRecoverErrorMsg("invalid host address: %w", err) + } prvKey, err := getPrivKey(options) - failOnErr(err, "nodekey error") + if err != nil { + return err + } p2pPrvKey := utils.EcdsaPrivKeyToSecp256k1PrivKey(prvKey) id, err := peer.IDFromPublicKey(p2pPrvKey.GetPublic()) - failOnErr(err, "deriving peer ID from private key") + if err != nil { + return err + } + logger := utils.Logger().With(logging.HostID("node", id)) var db *sql.DB @@ -104,7 +114,9 @@ func Execute(options NodeOptions) { Vacuum: options.Store.Vacuum, } db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger) - failOnErr(err, "Could not connect to DB") + if err != nil { + return nonRecoverErrorMsg("could not connect to DB: %w", err) + } } ctx := context.Background() @@ -117,7 +129,7 @@ func Execute(options NodeOptions) { lvl, err := zapcore.ParseLevel(options.LogLevel) if err != nil { - failOnErr(err, "log level error") + return err } nodeOpts := []node.WakuNodeOption{ @@ -137,8 +149,9 @@ func Execute(options NodeOptions) { if options.ExtIP != "" { ip := net.ParseIP(options.ExtIP) if ip == nil { - failOnErr(errors.New("invalid IP address"), "could not set external IP address") + return nonRecoverErrorMsg("could not set external IP address: invalid IP") } + nodeOpts = append(nodeOpts, node.WithExternalIP(ip)) } @@ -155,7 +168,9 @@ func Execute(options NodeOptions) { limits := rcmgr.DefaultLimits // Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB scaledLimits := limits.Scale(int64(float64(memory.TotalMemory())*memPerc/100), int(float64(getNumFDs())*fdPerc/100)) resourceManager, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(scaledLimits)) - failOnErr(err, "setting resource limits") + if err != nil { + return fmt.Errorf("could not set resource limits: %w", err) + } libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(resourceManager)) libp2p.SetDefaultServiceLimits(&limits) @@ -179,7 +194,7 @@ func Execute(options NodeOptions) { logger.Warn("node forced to be publicly reachable!") libp2pOpts = append(libp2pOpts, libp2p.ForceReachabilityPublic()) } else { - failOnErr(errors.New("invalid reachability value"), "Reachability") + return nonRecoverErrorMsg("invalid reachability value") } } @@ -197,18 +212,23 @@ func Execute(options NodeOptions) { if options.ShowAddresses { printListeningAddresses(ctx, nodeOpts, options) - return + return nil } if options.Store.Enable && options.PersistPeers { // Create persistent peerstore queries, err := dbutils.NewQueries("peerstore", db) - failOnErr(err, "Peerstore") + if err != nil { + return nonRecoverErrorMsg("could not setup persistent peerstore database: %w", err) + + } datastore := dssql.NewDatastore(db, queries) opts := pstoreds.DefaultOpts() peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts) - failOnErr(err, "Peerstore") + if err != nil { + return nonRecoverErrorMsg("could not create persistent peerstore: %w", err) + } nodeOpts = append(nodeOpts, node.WithPeerStore(peerStore)) } @@ -244,7 +264,10 @@ func Execute(options NodeOptions) { } dbStore, err = persistence.NewDBStore(prometheus.DefaultRegisterer, logger, dbOptions...) - failOnErr(err, "DBStore") + if err != nil { + return nonRecoverErrorMsg("error setting up db store: %w", err) + } + nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } @@ -275,7 +298,7 @@ func Execute(options NodeOptions) { } } } else { - logger.Fatal("DNS discovery URL is required") + return nonRecoverErrorMsg("DNS discovery URL is required") } } @@ -307,26 +330,36 @@ func Execute(options NodeOptions) { nodeOpts = append(nodeOpts, node.WithRendezvous(rdb)) } - checkForRLN(logger, options, &nodeOpts) - - wakuNode, err := node.New(nodeOpts...) - utils.Logger().Info("Version details ", zap.String("version", node.Version), zap.String("commit", node.GitCommit)) - failOnErr(err, "Wakunode") + if err = checkForRLN(logger, options, &nodeOpts); err != nil { + return nonRecoverError(err) + } + + wakuNode, err := node.New(nodeOpts...) + if err != nil { + return fmt.Errorf("could not instantiate waku: %w", err) + } + //Process pubSub and contentTopics specified and arrive at all corresponding pubSubTopics - pubSubTopicMap := processTopics(options) + pubSubTopicMap, err := processTopics(options) + if err != nil { + return nonRecoverError(err) + } + pubSubTopicMapKeys := make([]string, 0, len(pubSubTopicMap)) for k := range pubSubTopicMap { pubSubTopicMapKeys = append(pubSubTopicMapKeys, k) } if options.Filter.UseV1 { - addStaticPeers(wakuNode, options.Filter.NodesV1, pubSubTopicMapKeys, legacy_filter.FilterID_v20beta1) + if err := addStaticPeers(wakuNode, options.Filter.NodesV1, pubSubTopicMapKeys, legacy_filter.FilterID_v20beta1); err != nil { + return err + } } if err = wakuNode.Start(ctx); err != nil { - logger.Fatal("starting waku node", zap.Error(err)) + return nonRecoverError(err) } for _, d := range discoveredNodes { @@ -334,10 +367,17 @@ func Execute(options NodeOptions) { } //For now assuming that static peers added support/listen on all topics specified via commandLine. - addStaticPeers(wakuNode, options.Store.Nodes, pubSubTopicMapKeys, store.StoreID_v20beta4) - addStaticPeers(wakuNode, options.LightPush.Nodes, pubSubTopicMapKeys, lightpush.LightPushID_v20beta1) - addStaticPeers(wakuNode, options.Rendezvous.Nodes, pubSubTopicMapKeys, rendezvous.RendezvousID) - addStaticPeers(wakuNode, options.Filter.Nodes, pubSubTopicMapKeys, filter.FilterSubscribeID_v20beta1) + staticPeers := map[protocol.ID][]multiaddr.Multiaddr{ + store.StoreID_v20beta4: options.Store.Nodes, + lightpush.LightPushID_v20beta1: options.LightPush.Nodes, + rendezvous.RendezvousID: options.Rendezvous.Nodes, + filter.FilterSubscribeID_v20beta1: options.Filter.Nodes, + } + for protocolID, peers := range staticPeers { + if err = addStaticPeers(wakuNode, peers, pubSubTopicMapKeys, protocolID); err != nil { + return err + } + } var wg sync.WaitGroup @@ -345,7 +385,10 @@ func Execute(options NodeOptions) { for nodeTopic := range pubSubTopicMap { nodeTopic := nodeTopic sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) - failOnErr(err, "Error subscring to topic") + if err != nil { + return err + } + sub.Unsubscribe() if len(options.Rendezvous.Nodes) != 0 { @@ -401,8 +444,9 @@ func Execute(options NodeOptions) { } for _, protectedTopic := range options.Relay.ProtectedTopics { - err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey) - failOnErr(err, "Error adding signed topic validator") + if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil { + return nonRecoverErrorMsg("could not add signed topic validator: %w", err) + } } } @@ -479,27 +523,33 @@ func Execute(options NodeOptions) { wakuNode.Stop() if options.RPCServer.Enable { - err := rpcServer.Stop(ctx) - failOnErr(err, "RPCClose") + if err := rpcServer.Stop(ctx); err != nil { + return err + } } if options.RESTServer.Enable { - err := restServer.Stop(ctx) - failOnErr(err, "RESTClose") + if err := restServer.Stop(ctx); err != nil { + return err + } } if options.Metrics.Enable { - err = metricsServer.Stop(ctx) - failOnErr(err, "MetricsClose") + if err = metricsServer.Stop(ctx); err != nil { + return err + } } - if options.Store.Enable { - err = db.Close() - failOnErr(err, "DBClose") + if db != nil { + if err = db.Close(); err != nil { + return err + } } + + return nil } -func processTopics(options NodeOptions) map[string]struct{} { +func processTopics(options NodeOptions) (map[string]struct{}, error) { //Using a map to avoid duplicate pub-sub topics that can result from autosharding // or same-topic being passed twice. pubSubTopicMap := make(map[string]struct{}) @@ -516,7 +566,7 @@ func processTopics(options NodeOptions) map[string]struct{} { for _, cTopic := range options.Relay.ContentTopics.Value() { contentTopic, err := wprotocol.StringToContentTopic(cTopic) if err != nil { - failOnErr(err, "failed to parse content topic") + return nil, err } pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount) pubSubTopicMap[pTopic.String()] = struct{}{} @@ -526,14 +576,17 @@ func processTopics(options NodeOptions) map[string]struct{} { pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{} } - return pubSubTopicMap + return pubSubTopicMap, nil } -func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) { +func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error { for _, addr := range addresses { _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...) - failOnErr(err, "error adding peer") + if err != nil { + return fmt.Errorf("could not add static peer: %w", err) + } } + return nil } func loadPrivateKeyFromFile(path string, passwd string) (*ecdsa.PrivateKey, error) { diff --git a/cmd/waku/node_no_rln.go b/cmd/waku/node_no_rln.go index de285e5a..9756cee3 100644 --- a/cmd/waku/node_no_rln.go +++ b/cmd/waku/node_no_rln.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" ) -func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuNodeOption) { +func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuNodeOption) error { // Do nothing + return nil } diff --git a/cmd/waku/node_rln.go b/cmd/waku/node_rln.go index 703660f2..d6dc5b73 100644 --- a/cmd/waku/node_rln.go +++ b/cmd/waku/node_rln.go @@ -11,11 +11,12 @@ import ( "go.uber.org/zap" ) -func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuNodeOption) { +func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuNodeOption) error { if options.RLNRelay.Enable { if !options.Relay.Enable { - failOnErr(errors.New("relay not available"), "Could not enable RLN Relay") + return errors.New("waku relay is required to enable RLN relay") } + if !options.RLNRelay.Dynamic { *nodeOpts = append(*nodeOpts, node.WithStaticRLNRelay((*rln.MembershipIndex)(options.RLNRelay.MembershipIndex), nil)) } else { @@ -32,4 +33,6 @@ func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuN )) } } + + return nil }