diff --git a/waku/node.go b/waku/node.go index 96d4d3fb..c9ec4344 100644 --- a/waku/node.go +++ b/waku/node.go @@ -3,6 +3,7 @@ package waku import ( "context" "crypto/rand" + "database/sql" "encoding/hex" "errors" "fmt" @@ -54,6 +55,7 @@ var rootCmd = &cobra.Command{ relay, _ := cmd.Flags().GetBool("relay") key, _ := cmd.Flags().GetString("nodekey") store, _ := cmd.Flags().GetBool("store") + useDB, _ := cmd.Flags().GetBool("use-db") dbPath, _ := cmd.Flags().GetString("dbpath") storenode, _ := cmd.Flags().GetString("storenode") staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes") @@ -70,39 +72,54 @@ var rootCmd = &cobra.Command{ prvKey, err := crypto.HexToECDSA(key) - if dbPath == "" { + if dbPath == "" && useDB { checkError(errors.New("dbpath can't be null"), "") } - db, err := sqlite.NewDB(dbPath) - checkError(err, "Could not connect to DB") + var db *sql.DB + + if useDB { + db, err = sqlite.NewDB(dbPath) + checkError(err, "Could not connect to DB") + } ctx := context.Background() - // Create persistent peerstore - queries, err := sqlite.NewQueries("peerstore", db) - checkError(err, "Peerstore") - - datastore := dssql.NewDatastore(db, queries) - opts := pstoreds.DefaultOpts() - peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts) - checkError(err, "Peerstore") - nodeOpts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), node.WithHostAddress([]net.Addr{hostAddr}), - node.WithLibP2POptions(append(node.DefaultLibP2POptions, libp2p.Peerstore(peerStore))...), } + libp2pOpts := node.DefaultLibP2POptions + + if useDB { + // Create persistent peerstore + queries, err := sqlite.NewQueries("peerstore", db) + checkError(err, "Peerstore") + + datastore := dssql.NewDatastore(db, queries) + opts := pstoreds.DefaultOpts() + peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts) + checkError(err, "Peerstore") + + libp2pOpts = append(libp2pOpts, libp2p.Peerstore(peerStore)) + } + + nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) + if relay { nodeOpts = append(nodeOpts, node.WithWakuRelay()) } if store { - dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) - checkError(err, "DBStore") nodeOpts = append(nodeOpts, node.WithWakuStore(true)) - nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) + if useDB { + dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) + checkError(err, "DBStore") + nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) + } else { + nodeOpts = append(nodeOpts, node.WithMessageProvider(nil)) + } } wakuNode, err := node.New(ctx, nodeOpts...) @@ -139,8 +156,11 @@ var rootCmd = &cobra.Command{ // shut the node down wakuNode.Stop() - err = db.Close() - checkError(err, "DBClose") + + if useDB { + err = db.Close() + checkError(err, "DBClose") + } }, } @@ -158,6 +178,7 @@ func init() { rootCmd.Flags().StringSlice("topics", []string{string(node.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", node.DefaultWakuTopic)) rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated") rootCmd.Flags().Bool("store", false, "Enable store protocol") + rootCmd.Flags().Bool("use-db", true, "Store messages and peers in a DB, (default: true, use false for in-memory only)") rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file") rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol") rootCmd.Flags().Bool("relay", true, "Enable relay protocol") diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 6dc00bd4..25a4515e 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -75,7 +75,7 @@ func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages - params.store = store.NewWakuStore(params.ctx, nil) + params.store = store.NewWakuStore(params.ctx, shouldStoreMessages, nil) return nil } } @@ -85,7 +85,7 @@ func WithMessageProvider(s store.MessageProvider) WakuNodeOption { if params.store != nil { params.store.SetMsgProvider(s) } else { - params.store = store.NewWakuStore(params.ctx, s) + params.store = store.NewWakuStore(params.ctx, true, s) } return nil } diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index 2f188196..0d9bf215 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -182,16 +182,18 @@ type WakuStore struct { messages []IndexedWakuMessage messagesMutex sync.Mutex + storeMsgs bool msgProvider MessageProvider h host.Host ctx context.Context } -func NewWakuStore(ctx context.Context, p MessageProvider) *WakuStore { +func NewWakuStore(ctx context.Context, shouldStoreMessages bool, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) wakuStore.MsgC = make(chan *common.Envelope) wakuStore.msgProvider = p wakuStore.ctx = ctx + wakuStore.storeMsgs = shouldStoreMessages return wakuStore } @@ -203,12 +205,20 @@ func (store *WakuStore) SetMsgProvider(p MessageProvider) { func (store *WakuStore) Start(h host.Host) { store.h = h - if store.msgProvider == nil { + if !store.storeMsgs { + log.Info("Store protocol started (messages aren't stored)") return } store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest) + go store.storeIncomingMessages() + + if store.msgProvider == nil { + log.Info("Store protocol started (no message provider)") + return + } + messages, err := store.msgProvider.GetAll() if err != nil { log.Error("could not load DBProvider messages") @@ -225,8 +235,6 @@ func (store *WakuStore) Start(h host.Host) { } log.Info("Store protocol started") - - go store.storeIncomingMessages() } func (store *WakuStore) storeIncomingMessages() {