persist messages on sqlite db

This commit is contained in:
Richard Ramos 2021-04-12 13:59:41 -04:00
parent ad9abe601c
commit fa79f9a864
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
5 changed files with 116 additions and 51 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
*.db
# Binaries for programs and plugins
*.exe
*.exe~

View File

@ -14,8 +14,6 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
)
func randomHex(n int) (string, error) {
@ -26,25 +24,6 @@ func randomHex(n int) (string, error) {
return hex.EncodeToString(bytes), nil
}
type DBStore struct {
store.MessageProvider
}
func (dbStore *DBStore) Put(message *protocol.WakuMessage) error {
fmt.Println("TODO: Implement MessageProvider.Put")
return nil
}
func (dbStore *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
fmt.Println("TODO: Implement MessageProvider.GetAll. Returning a sample message")
exampleMessage := new(protocol.WakuMessage)
exampleMessage.ContentTopic = "dingpu"
exampleMessage.Payload = []byte("Hello!")
exampleMessage.Version = 0
return []*protocol.WakuMessage{exampleMessage}, nil
}
var rootCmd = &cobra.Command{
Use: "waku",
Short: "Start a waku node",
@ -56,9 +35,9 @@ var rootCmd = &cobra.Command{
relay, _ := cmd.Flags().GetBool("relay")
key, _ := cmd.Flags().GetString("nodekey")
store, _ := cmd.Flags().GetBool("store")
dbPath, _ := cmd.Flags().GetString("dbpath")
storenode, _ := cmd.Flags().GetString("storenode")
staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes")
query, _ := cmd.Flags().GetBool("query")
hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port))
@ -84,8 +63,14 @@ var rootCmd = &cobra.Command{
wakuNode.MountRelay()
}
if store {
err := wakuNode.MountStore(new(DBStore))
if store && dbPath != "" {
db, err := NewDBStore(dbPath)
if err != nil {
fmt.Println(err)
return
}
err = wakuNode.MountStore(db)
if err != nil {
fmt.Println(err)
return
@ -108,32 +93,6 @@ var rootCmd = &cobra.Command{
}
}
if query {
if !store {
fmt.Println("Store protocol was not started")
return
}
var DefaultContentTopic string = "dingpu"
response, err := wakuNode.Query(DefaultContentTopic, true, 10)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(fmt.Sprint("Page Size: ", response.PagingInfo.PageSize))
fmt.Println(fmt.Sprint("Direction: ", response.PagingInfo.Direction))
if response.PagingInfo.Cursor != nil {
fmt.Println(fmt.Sprint("Cursor - ReceivedTime: ", response.PagingInfo.Cursor.ReceivedTime))
fmt.Println(fmt.Sprint("Cursor - Digest: ", hex.EncodeToString(response.PagingInfo.Cursor.Digest)))
}
fmt.Println("Messages:")
for i, msg := range response.Messages {
fmt.Println(fmt.Sprint(i, "- ", string(msg.Payload))) // Normaly you'd have to decode these, but i'm using v0
}
}
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
@ -158,9 +117,9 @@ func init() {
rootCmd.Flags().String("nodekey", "", "P2P node private key as hex (default random)")
rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated")
rootCmd.Flags().Bool("store", false, "Enable store protocol")
rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file")
rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol")
rootCmd.Flags().Bool("relay", true, "Enable relay protocol")
rootCmd.Flags().Bool("query", false, "Asks the storenode for stored messages")
}

101
cmd/store.go Normal file
View File

@ -0,0 +1,101 @@
package cmd
import (
"database/sql"
"log"
_ "github.com/mattn/go-sqlite3"
"github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
)
type DBStore struct {
store.MessageProvider
db *sql.DB
}
func NewDBStore(path string) (*DBStore, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, err
}
result := &DBStore{db: db}
err = result.createTable()
if err != nil {
return nil, err
}
return result, nil
}
func (d *DBStore) createTable() error {
sqlStmt := `CREATE TABLE IF NOT EXISTS messages (
id BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL DEFAULT 0
) WITHOUT ROWID;`
_, err := d.db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}
func (d *DBStore) Stop() {
d.db.Close()
}
func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) error {
stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return err
}
_, err = stmt.Exec(cursor.Digest, uint64(message.Timestamp), message.ContentTopic, message.Payload, message.Version)
if err != nil {
return err
}
return nil
}
func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC")
if err != nil {
return nil, err
}
var result []*protocol.WakuMessage
defer rows.Close()
for rows.Next() {
var timestamp int64
var contentTopic string
var payload []byte
var version uint32
err = rows.Scan(&timestamp, &contentTopic, &payload, &version)
if err != nil {
log.Fatal(err)
}
msg := new(protocol.WakuMessage)
msg.ContentTopic = contentTopic
msg.Payload = payload
msg.Timestamp = float64(timestamp)
msg.Version = version
result = append(result, msg)
}
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-msgio v0.0.6
github.com/magiconair/properties v1.8.4 // indirect
github.com/mattn/go-sqlite3 v1.14.6
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-net v0.2.0

2
go.sum
View File

@ -450,6 +450,8 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=