mirror of https://github.com/status-im/go-waku.git
Persistent peer store
This commit is contained in:
parent
875dbbaf75
commit
1b746cdec8
2
go.mod
2
go.mod
|
@ -6,10 +6,12 @@ require (
|
||||||
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d
|
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d
|
||||||
github.com/ethereum/go-ethereum v1.9.5
|
github.com/ethereum/go-ethereum v1.9.5
|
||||||
github.com/golang/protobuf v1.4.1
|
github.com/golang/protobuf v1.4.1
|
||||||
|
github.com/ipfs/go-ds-sql v0.2.0
|
||||||
github.com/ipfs/go-log v1.0.4
|
github.com/ipfs/go-log v1.0.4
|
||||||
github.com/libp2p/go-libp2p v0.13.0
|
github.com/libp2p/go-libp2p v0.13.0
|
||||||
github.com/libp2p/go-libp2p-connmgr v0.2.4
|
github.com/libp2p/go-libp2p-connmgr v0.2.4
|
||||||
github.com/libp2p/go-libp2p-core v0.8.5
|
github.com/libp2p/go-libp2p-core v0.8.5
|
||||||
|
github.com/libp2p/go-libp2p-peerstore v0.2.6
|
||||||
github.com/libp2p/go-msgio v0.0.6
|
github.com/libp2p/go-msgio v0.0.6
|
||||||
github.com/magiconair/properties v1.8.4 // indirect
|
github.com/magiconair/properties v1.8.4 // indirect
|
||||||
github.com/mattn/go-sqlite3 v1.14.6
|
github.com/mattn/go-sqlite3 v1.14.6
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -192,6 +192,7 @@ github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqg
|
||||||
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
|
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
|
||||||
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
||||||
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
||||||
|
github.com/ipfs/go-datastore v0.4.4 h1:rjvQ9+muFaJ+QZ7dN5B1MSDNQ0JVZKkkES/rMZmA8X8=
|
||||||
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
||||||
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
|
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
|
||||||
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
|
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
|
||||||
|
@ -202,6 +203,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
|
||||||
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
|
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
|
||||||
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
|
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
|
||||||
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
|
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
|
||||||
|
github.com/ipfs/go-ds-sql v0.2.0 h1:ZUHUbU5IydNuBWzcRMOZYkBUwTg+L56o23fEVcbWC7o=
|
||||||
|
github.com/ipfs/go-ds-sql v0.2.0/go.mod h1:/c47NpRiHobwn+8F8EpW0yBy8d3Mx/j/tIlrVN1e1Ec=
|
||||||
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
|
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
|
||||||
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
|
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
|
||||||
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
|
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
|
||||||
|
@ -255,6 +258,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
|
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||||
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
|
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
|
||||||
github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU=
|
github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU=
|
||||||
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
|
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
package sqlite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
"github.com/status-im/go-waku/waku/persistence"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Queries are the postgres queries for a given table.
|
||||||
|
type Queries struct {
|
||||||
|
deleteQuery string
|
||||||
|
existsQuery string
|
||||||
|
getQuery string
|
||||||
|
putQuery string
|
||||||
|
queryQuery string
|
||||||
|
prefixQuery string
|
||||||
|
limitQuery string
|
||||||
|
offsetQuery string
|
||||||
|
getSizeQuery string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueries creates a new SQLite set of queries for the passed table
|
||||||
|
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
|
||||||
|
err := CreateTable(db, tbl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Queries{
|
||||||
|
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
|
||||||
|
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
|
||||||
|
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
|
||||||
|
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
|
||||||
|
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
|
||||||
|
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
|
||||||
|
limitQuery: ` LIMIT %d`,
|
||||||
|
offsetQuery: ` OFFSET %d`,
|
||||||
|
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete returns the query for deleting a row.
|
||||||
|
func (q Queries) Delete() string {
|
||||||
|
return q.deleteQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exists returns the query for determining if a row exists.
|
||||||
|
func (q Queries) Exists() string {
|
||||||
|
return q.existsQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the query for getting a row.
|
||||||
|
func (q Queries) Get() string {
|
||||||
|
return q.getQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put returns the query for putting a row.
|
||||||
|
func (q Queries) Put() string {
|
||||||
|
return q.putQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query returns the query for getting multiple rows.
|
||||||
|
func (q Queries) Query() string {
|
||||||
|
return q.queryQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefix returns the query fragment for getting a rows with a key prefix.
|
||||||
|
func (q Queries) Prefix() string {
|
||||||
|
return q.prefixQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit returns the query fragment for limiting results.
|
||||||
|
func (q Queries) Limit() string {
|
||||||
|
return q.limitQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// Offset returns the query fragment for returning rows from a given offset.
|
||||||
|
func (q Queries) Offset() string {
|
||||||
|
return q.offsetQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSize returns the query for determining the size of a value.
|
||||||
|
func (q Queries) GetSize() string {
|
||||||
|
return q.getSizeQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDB(path string) persistence.DBOption {
|
||||||
|
return persistence.WithDriver("sqlite3", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDB(path string) (*sql.DB, error) {
|
||||||
|
db, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateTable(db *sql.DB, tableName string) error {
|
||||||
|
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
|
||||||
|
_, err := db.Exec(sqlStmt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,10 +1,9 @@
|
||||||
package cmd
|
package persistence
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
|
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
|
||||||
)
|
)
|
||||||
|
@ -14,14 +13,34 @@ type DBStore struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDBStore(path string) (*DBStore, error) {
|
type DBOption func(*DBStore) error
|
||||||
db, err := sql.Open("sqlite3", path)
|
|
||||||
|
func WithDB(db *sql.DB) DBOption {
|
||||||
|
return func(d *DBStore) error {
|
||||||
|
d.db = db
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDriver(driverName string, datasourceName string) DBOption {
|
||||||
|
return func(d *DBStore) error {
|
||||||
|
db, err := sql.Open(driverName, datasourceName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.db = db
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDBStore(opt DBOption) (*DBStore, error) {
|
||||||
|
result := new(DBStore)
|
||||||
|
|
||||||
|
err := opt(result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result := &DBStore{db: db}
|
|
||||||
|
|
||||||
err = result.createTable()
|
err = result.createTable()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
|
@ -119,10 +119,6 @@ func (w *WakuNode) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
w.subscriptions = nil
|
w.subscriptions = nil
|
||||||
|
|
||||||
if w.store != nil {
|
|
||||||
w.store.Stop()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) Host() host.Host {
|
func (w *WakuNode) Host() host.Host {
|
||||||
|
|
|
@ -209,10 +209,6 @@ func (store *WakuStore) Start() {
|
||||||
go store.storeIncomingMessages()
|
go store.storeIncomingMessages()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *WakuStore) Stop() {
|
|
||||||
store.msgProvider.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (store *WakuStore) storeIncomingMessages() {
|
func (store *WakuStore) storeIncomingMessages() {
|
||||||
for envelope := range store.msg {
|
for envelope := range store.msg {
|
||||||
index, err := computeIndex(envelope.Message())
|
index, err := computeIndex(envelope.Message())
|
||||||
|
|
Loading…
Reference in New Issue