support memory only waku store

This commit is contained in:
Richard Ramos 2021-04-18 20:03:16 -04:00
parent 57e36021bb
commit ef67ff356f
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
3 changed files with 53 additions and 24 deletions

View File

@ -3,6 +3,7 @@ package waku
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"database/sql"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -54,6 +55,7 @@ var rootCmd = &cobra.Command{
relay, _ := cmd.Flags().GetBool("relay") relay, _ := cmd.Flags().GetBool("relay")
key, _ := cmd.Flags().GetString("nodekey") key, _ := cmd.Flags().GetString("nodekey")
store, _ := cmd.Flags().GetBool("store") store, _ := cmd.Flags().GetBool("store")
useDB, _ := cmd.Flags().GetBool("use-db")
dbPath, _ := cmd.Flags().GetString("dbpath") dbPath, _ := cmd.Flags().GetString("dbpath")
storenode, _ := cmd.Flags().GetString("storenode") storenode, _ := cmd.Flags().GetString("storenode")
staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes") staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes")
@ -70,39 +72,54 @@ var rootCmd = &cobra.Command{
prvKey, err := crypto.HexToECDSA(key) prvKey, err := crypto.HexToECDSA(key)
if dbPath == "" { if dbPath == "" && useDB {
checkError(errors.New("dbpath can't be null"), "") checkError(errors.New("dbpath can't be null"), "")
} }
db, err := sqlite.NewDB(dbPath) var db *sql.DB
checkError(err, "Could not connect to DB")
if useDB {
db, err = sqlite.NewDB(dbPath)
checkError(err, "Could not connect to DB")
}
ctx := context.Background() ctx := context.Background()
// Create persistent peerstore
queries, err := sqlite.NewQueries("peerstore", db)
checkError(err, "Peerstore")
datastore := dssql.NewDatastore(db, queries)
opts := pstoreds.DefaultOpts()
peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts)
checkError(err, "Peerstore")
nodeOpts := []node.WakuNodeOption{ nodeOpts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey), node.WithPrivateKey(prvKey),
node.WithHostAddress([]net.Addr{hostAddr}), node.WithHostAddress([]net.Addr{hostAddr}),
node.WithLibP2POptions(append(node.DefaultLibP2POptions, libp2p.Peerstore(peerStore))...),
} }
libp2pOpts := node.DefaultLibP2POptions
if useDB {
// Create persistent peerstore
queries, err := sqlite.NewQueries("peerstore", db)
checkError(err, "Peerstore")
datastore := dssql.NewDatastore(db, queries)
opts := pstoreds.DefaultOpts()
peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts)
checkError(err, "Peerstore")
libp2pOpts = append(libp2pOpts, libp2p.Peerstore(peerStore))
}
nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...))
if relay { if relay {
nodeOpts = append(nodeOpts, node.WithWakuRelay()) nodeOpts = append(nodeOpts, node.WithWakuRelay())
} }
if store { if store {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
checkError(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithWakuStore(true)) nodeOpts = append(nodeOpts, node.WithWakuStore(true))
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) if useDB {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
checkError(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
} else {
nodeOpts = append(nodeOpts, node.WithMessageProvider(nil))
}
} }
wakuNode, err := node.New(ctx, nodeOpts...) wakuNode, err := node.New(ctx, nodeOpts...)
@ -139,8 +156,11 @@ var rootCmd = &cobra.Command{
// shut the node down // shut the node down
wakuNode.Stop() wakuNode.Stop()
err = db.Close()
checkError(err, "DBClose") if useDB {
err = db.Close()
checkError(err, "DBClose")
}
}, },
} }
@ -158,6 +178,7 @@ func init() {
rootCmd.Flags().StringSlice("topics", []string{string(node.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", node.DefaultWakuTopic)) rootCmd.Flags().StringSlice("topics", []string{string(node.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", node.DefaultWakuTopic))
rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated") rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated")
rootCmd.Flags().Bool("store", false, "Enable store protocol") rootCmd.Flags().Bool("store", false, "Enable store protocol")
rootCmd.Flags().Bool("use-db", true, "Store messages and peers in a DB, (default: true, use false for in-memory only)")
rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file") rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file")
rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol") rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol")
rootCmd.Flags().Bool("relay", true, "Enable relay protocol") rootCmd.Flags().Bool("relay", true, "Enable relay protocol")

View File

@ -75,7 +75,7 @@ func WithWakuStore(shouldStoreMessages bool) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableStore = true params.enableStore = true
params.storeMsgs = shouldStoreMessages params.storeMsgs = shouldStoreMessages
params.store = store.NewWakuStore(params.ctx, nil) params.store = store.NewWakuStore(params.ctx, shouldStoreMessages, nil)
return nil return nil
} }
} }
@ -85,7 +85,7 @@ func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
if params.store != nil { if params.store != nil {
params.store.SetMsgProvider(s) params.store.SetMsgProvider(s)
} else { } else {
params.store = store.NewWakuStore(params.ctx, s) params.store = store.NewWakuStore(params.ctx, true, s)
} }
return nil return nil
} }

View File

@ -182,16 +182,18 @@ type WakuStore struct {
messages []IndexedWakuMessage messages []IndexedWakuMessage
messagesMutex sync.Mutex messagesMutex sync.Mutex
storeMsgs bool
msgProvider MessageProvider msgProvider MessageProvider
h host.Host h host.Host
ctx context.Context ctx context.Context
} }
func NewWakuStore(ctx context.Context, p MessageProvider) *WakuStore { func NewWakuStore(ctx context.Context, shouldStoreMessages bool, p MessageProvider) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.MsgC = make(chan *common.Envelope) wakuStore.MsgC = make(chan *common.Envelope)
wakuStore.msgProvider = p wakuStore.msgProvider = p
wakuStore.ctx = ctx wakuStore.ctx = ctx
wakuStore.storeMsgs = shouldStoreMessages
return wakuStore return wakuStore
} }
@ -203,12 +205,20 @@ func (store *WakuStore) SetMsgProvider(p MessageProvider) {
func (store *WakuStore) Start(h host.Host) { func (store *WakuStore) Start(h host.Host) {
store.h = h store.h = h
if store.msgProvider == nil { if !store.storeMsgs {
log.Info("Store protocol started (messages aren't stored)")
return return
} }
store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest) store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest)
go store.storeIncomingMessages()
if store.msgProvider == nil {
log.Info("Store protocol started (no message provider)")
return
}
messages, err := store.msgProvider.GetAll() messages, err := store.msgProvider.GetAll()
if err != nil { if err != nil {
log.Error("could not load DBProvider messages") log.Error("could not load DBProvider messages")
@ -225,8 +235,6 @@ func (store *WakuStore) Start(h host.Host) {
} }
log.Info("Store protocol started") log.Info("Store protocol started")
go store.storeIncomingMessages()
} }
func (store *WakuStore) storeIncomingMessages() { func (store *WakuStore) storeIncomingMessages() {