From 1b746cdec800907f61443164ec19d7d71f010515 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 13 Apr 2021 14:52:57 -0400 Subject: [PATCH] Persistent peer store --- go.mod | 2 + go.sum | 4 + waku/persistence/sqlite/sqlite.go | 107 ++++++++++++++++++++++ {cmd => waku/persistence}/store.go | 31 +++++-- waku/v2/node/wakunode2.go | 4 - waku/v2/protocol/waku_store/waku_store.go | 4 - 6 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 waku/persistence/sqlite/sqlite.go rename {cmd => waku/persistence}/store.go (80%) diff --git a/go.mod b/go.mod index 7b08cd83..35d9485c 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,12 @@ require ( github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d github.com/ethereum/go-ethereum v1.9.5 github.com/golang/protobuf v1.4.1 + github.com/ipfs/go-ds-sql v0.2.0 github.com/ipfs/go-log v1.0.4 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 github.com/libp2p/go-libp2p-core v0.8.5 + github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-msgio v0.0.6 github.com/magiconair/properties v1.8.4 // indirect github.com/mattn/go-sqlite3 v1.14.6 diff --git a/go.sum b/go.sum index d1cca593..f1d8193f 100644 --- a/go.sum +++ b/go.sum @@ -192,6 +192,7 @@ github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqg github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= +github.com/ipfs/go-datastore v0.4.4 h1:rjvQ9+muFaJ+QZ7dN5B1MSDNQ0JVZKkkES/rMZmA8X8= github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= @@ -202,6 +203,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= +github.com/ipfs/go-ds-sql v0.2.0 h1:ZUHUbU5IydNuBWzcRMOZYkBUwTg+L56o23fEVcbWC7o= +github.com/ipfs/go-ds-sql v0.2.0/go.mod h1:/c47NpRiHobwn+8F8EpW0yBy8d3Mx/j/tIlrVN1e1Ec= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= @@ -255,6 +258,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go new file mode 100644 index 00000000..e9ac51ee --- /dev/null +++ b/waku/persistence/sqlite/sqlite.go @@ -0,0 +1,107 @@ +package sqlite + +import ( + "database/sql" + "fmt" + + _ "github.com/mattn/go-sqlite3" + "github.com/status-im/go-waku/waku/persistence" +) + +// Queries are the postgres queries for a given table. +type Queries struct { + deleteQuery string + existsQuery string + getQuery string + putQuery string + queryQuery string + prefixQuery string + limitQuery string + offsetQuery string + getSizeQuery string +} + +// NewQueries creates a new SQLite set of queries for the passed table +func NewQueries(tbl string, db *sql.DB) (*Queries, error) { + err := CreateTable(db, tbl) + if err != nil { + return nil, err + } + return &Queries{ + deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), + existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), + getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), + putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), + queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), + prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, + limitQuery: ` LIMIT %d`, + offsetQuery: ` OFFSET %d`, + getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl), + }, nil +} + +// Delete returns the query for deleting a row. +func (q Queries) Delete() string { + return q.deleteQuery +} + +// Exists returns the query for determining if a row exists. +func (q Queries) Exists() string { + return q.existsQuery +} + +// Get returns the query for getting a row. +func (q Queries) Get() string { + return q.getQuery +} + +// Put returns the query for putting a row. +func (q Queries) Put() string { + return q.putQuery +} + +// Query returns the query for getting multiple rows. +func (q Queries) Query() string { + return q.queryQuery +} + +// Prefix returns the query fragment for getting a rows with a key prefix. +func (q Queries) Prefix() string { + return q.prefixQuery +} + +// Limit returns the query fragment for limiting results. +func (q Queries) Limit() string { + return q.limitQuery +} + +// Offset returns the query fragment for returning rows from a given offset. +func (q Queries) Offset() string { + return q.offsetQuery +} + +// GetSize returns the query for determining the size of a value. +func (q Queries) GetSize() string { + return q.getSizeQuery +} + +func WithDB(path string) persistence.DBOption { + return persistence.WithDriver("sqlite3", path) +} + +func NewDB(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil, err + } + return db, nil +} + +func CreateTable(db *sql.DB, tableName string) error { + sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName) + _, err := db.Exec(sqlStmt) + if err != nil { + return err + } + return nil +} diff --git a/cmd/store.go b/waku/persistence/store.go similarity index 80% rename from cmd/store.go rename to waku/persistence/store.go index 8d3554b4..19c06cea 100644 --- a/cmd/store.go +++ b/waku/persistence/store.go @@ -1,10 +1,9 @@ -package cmd +package persistence import ( "database/sql" "log" - _ "github.com/mattn/go-sqlite3" "github.com/status-im/go-waku/waku/v2/protocol" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" ) @@ -14,14 +13,34 @@ type DBStore struct { db *sql.DB } -func NewDBStore(path string) (*DBStore, error) { - db, err := sql.Open("sqlite3", path) +type DBOption func(*DBStore) error + +func WithDB(db *sql.DB) DBOption { + return func(d *DBStore) error { + d.db = db + return nil + } +} + +func WithDriver(driverName string, datasourceName string) DBOption { + return func(d *DBStore) error { + db, err := sql.Open(driverName, datasourceName) + if err != nil { + return err + } + d.db = db + return nil + } +} + +func NewDBStore(opt DBOption) (*DBStore, error) { + result := new(DBStore) + + err := opt(result) if err != nil { return nil, err } - result := &DBStore{db: db} - err = result.createTable() if err != nil { return nil, err diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8d427e7a..fecdb323 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -119,10 +119,6 @@ func (w *WakuNode) Stop() { } w.subscriptions = nil - - if w.store != nil { - w.store.Stop() - } } func (w *WakuNode) Host() host.Host { diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index a211b9a5..83af6d3d 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -209,10 +209,6 @@ func (store *WakuStore) Start() { go store.storeIncomingMessages() } -func (store *WakuStore) Stop() { - store.msgProvider.Stop() -} - func (store *WakuStore) storeIncomingMessages() { for envelope := range store.msg { index, err := computeIndex(envelope.Message())