From fa79f9a864bb7a99c470d8d65c1b8294a5e1b7bd Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 12 Apr 2021 13:59:41 -0400 Subject: [PATCH] persist messages on sqlite db --- .gitignore | 2 + cmd/root.go | 61 +++++-------------------------- cmd/store.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 5 files changed, 116 insertions(+), 51 deletions(-) create mode 100644 cmd/store.go diff --git a/.gitignore b/.gitignore index 66fd13c9..55f84c6a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +*.db + # Binaries for programs and plugins *.exe *.exe~ diff --git a/cmd/root.go b/cmd/root.go index 72fd8cbc..4d4b7de1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -14,8 +14,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/status-im/go-waku/waku/v2/node" - "github.com/status-im/go-waku/waku/v2/protocol" - store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" ) func randomHex(n int) (string, error) { @@ -26,25 +24,6 @@ func randomHex(n int) (string, error) { return hex.EncodeToString(bytes), nil } -type DBStore struct { - store.MessageProvider -} - -func (dbStore *DBStore) Put(message *protocol.WakuMessage) error { - fmt.Println("TODO: Implement MessageProvider.Put") - return nil -} - -func (dbStore *DBStore) GetAll() ([]*protocol.WakuMessage, error) { - fmt.Println("TODO: Implement MessageProvider.GetAll. Returning a sample message") - exampleMessage := new(protocol.WakuMessage) - exampleMessage.ContentTopic = "dingpu" - exampleMessage.Payload = []byte("Hello!") - exampleMessage.Version = 0 - - return []*protocol.WakuMessage{exampleMessage}, nil -} - var rootCmd = &cobra.Command{ Use: "waku", Short: "Start a waku node", @@ -56,9 +35,9 @@ var rootCmd = &cobra.Command{ relay, _ := cmd.Flags().GetBool("relay") key, _ := cmd.Flags().GetString("nodekey") store, _ := cmd.Flags().GetBool("store") + dbPath, _ := cmd.Flags().GetString("dbpath") storenode, _ := cmd.Flags().GetString("storenode") staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes") - query, _ := cmd.Flags().GetBool("query") hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port)) @@ -84,8 +63,14 @@ var rootCmd = &cobra.Command{ wakuNode.MountRelay() } - if store { - err := wakuNode.MountStore(new(DBStore)) + if store && dbPath != "" { + db, err := NewDBStore(dbPath) + if err != nil { + fmt.Println(err) + return + } + + err = wakuNode.MountStore(db) if err != nil { fmt.Println(err) return @@ -108,32 +93,6 @@ var rootCmd = &cobra.Command{ } } - if query { - if !store { - fmt.Println("Store protocol was not started") - return - } - - var DefaultContentTopic string = "dingpu" - - response, err := wakuNode.Query(DefaultContentTopic, true, 10) - if err != nil { - fmt.Println(err) - return - } - - fmt.Println(fmt.Sprint("Page Size: ", response.PagingInfo.PageSize)) - fmt.Println(fmt.Sprint("Direction: ", response.PagingInfo.Direction)) - if response.PagingInfo.Cursor != nil { - fmt.Println(fmt.Sprint("Cursor - ReceivedTime: ", response.PagingInfo.Cursor.ReceivedTime)) - fmt.Println(fmt.Sprint("Cursor - Digest: ", hex.EncodeToString(response.PagingInfo.Cursor.Digest))) - } - fmt.Println("Messages:") - for i, msg := range response.Messages { - fmt.Println(fmt.Sprint(i, "- ", string(msg.Payload))) // Normaly you'd have to decode these, but i'm using v0 - } - } - // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) @@ -158,9 +117,9 @@ func init() { rootCmd.Flags().String("nodekey", "", "P2P node private key as hex (default random)") rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated") rootCmd.Flags().Bool("store", false, "Enable store protocol") + rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file") rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol") rootCmd.Flags().Bool("relay", true, "Enable relay protocol") - rootCmd.Flags().Bool("query", false, "Asks the storenode for stored messages") } diff --git a/cmd/store.go b/cmd/store.go new file mode 100644 index 00000000..8d3554b4 --- /dev/null +++ b/cmd/store.go @@ -0,0 +1,101 @@ +package cmd + +import ( + "database/sql" + "log" + + _ "github.com/mattn/go-sqlite3" + "github.com/status-im/go-waku/waku/v2/protocol" + store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" +) + +type DBStore struct { + store.MessageProvider + db *sql.DB +} + +func NewDBStore(path string) (*DBStore, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil, err + } + + result := &DBStore{db: db} + + err = result.createTable() + if err != nil { + return nil, err + } + + return result, nil +} + +func (d *DBStore) createTable() error { + sqlStmt := `CREATE TABLE IF NOT EXISTS messages ( + id BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL DEFAULT 0 + ) WITHOUT ROWID;` + _, err := d.db.Exec(sqlStmt) + if err != nil { + return err + } + return nil +} + +func (d *DBStore) Stop() { + d.db.Close() +} + +func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) error { + stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)") + if err != nil { + return err + } + _, err = stmt.Exec(cursor.Digest, uint64(message.Timestamp), message.ContentTopic, message.Payload, message.Version) + if err != nil { + return err + } + + return nil +} + +func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) { + rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC") + if err != nil { + return nil, err + } + + var result []*protocol.WakuMessage + + defer rows.Close() + + for rows.Next() { + var timestamp int64 + var contentTopic string + var payload []byte + var version uint32 + + err = rows.Scan(×tamp, &contentTopic, &payload, &version) + if err != nil { + log.Fatal(err) + } + + msg := new(protocol.WakuMessage) + msg.ContentTopic = contentTopic + msg.Payload = payload + msg.Timestamp = float64(timestamp) + msg.Version = version + + result = append(result, msg) + } + + err = rows.Err() + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/go.mod b/go.mod index 9ba5e6f1..7b08cd83 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-msgio v0.0.6 github.com/magiconair/properties v1.8.4 // indirect + github.com/mattn/go-sqlite3 v1.14.6 github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr-net v0.2.0 diff --git a/go.sum b/go.sum index 94d1ec51..d1cca593 100644 --- a/go.sum +++ b/go.sum @@ -450,6 +450,8 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=