mirror of https://github.com/status-im/go-waku.git
feat: postgresql support
This commit is contained in:
parent
2c9c763c30
commit
f10b1b0d7a
3
Makefile
3
Makefile
|
@ -86,7 +86,8 @@ test-ci: _before-cc test _after-cc
|
|||
|
||||
generate:
|
||||
${GOBIN} generate ./waku/v2/protocol/pb/generate.go
|
||||
${GOBIN} generate ./waku/persistence/migrations/sql
|
||||
${GOBIN} generate ./waku/persistence/sqlite/migrations/sql
|
||||
${GOBIN} generate ./waku/persistence/postgres/migrations/sql
|
||||
${GOBIN} generate ./waku/v2/protocol/rln/contracts/generate.go
|
||||
${GOBIN} generate ./waku/v2/protocol/rln/doc.go
|
||||
|
||||
|
|
5
go.mod
5
go.mod
|
@ -36,7 +36,10 @@ require (
|
|||
golang.org/x/text v0.4.0
|
||||
)
|
||||
|
||||
require github.com/waku-org/go-noise v0.0.4
|
||||
require (
|
||||
github.com/lib/pq v1.10.0
|
||||
github.com/waku-org/go-noise v0.0.4
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package waku
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/persistence/postgres"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||
)
|
||||
|
||||
func validateDBUrl(val string) error {
|
||||
matched, err := regexp.Match(`^[\w\+]+:\/\/[\w\/\\\.\:\@]+\?{0,1}.*$`, []byte(val))
|
||||
if !matched || err != nil {
|
||||
return errors.New("invalid db url option format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) {
|
||||
var db *sql.DB
|
||||
var migrationFn func(*sql.DB) error
|
||||
var err error
|
||||
|
||||
dbURL := ""
|
||||
if databaseURL != "" {
|
||||
err := validateDBUrl(databaseURL)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
dbURL = databaseURL
|
||||
} else {
|
||||
// In memoryDB
|
||||
dbURL = "sqlite://:memory:"
|
||||
}
|
||||
|
||||
dbURLParts := strings.Split(dbURL, "://")
|
||||
dbEngine := dbURLParts[0]
|
||||
dbParams := dbURLParts[1]
|
||||
switch dbEngine {
|
||||
case "sqlite3":
|
||||
db, migrationFn, err = sqlite.NewDB(dbParams)
|
||||
case "postgresql":
|
||||
db, migrationFn, err = postgres.NewDB(dbURL)
|
||||
default:
|
||||
err = errors.New("unsupported database engine")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return db, migrationFn, nil
|
||||
|
||||
}
|
46
waku/node.go
46
waku/node.go
|
@ -5,14 +5,11 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
@ -79,14 +76,6 @@ func freePort() (int, error) {
|
|||
return port, nil
|
||||
}
|
||||
|
||||
func validateDBUrl(val string) error {
|
||||
matched, err := regexp.Match(`^[\w\+]+:\/\/[\w\/\\\.\:\@]+\?{0,1}.*$`, []byte(val))
|
||||
if !matched || err != nil {
|
||||
return errors.New("invalid db url option format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const dialTimeout = 7 * time.Second
|
||||
|
||||
// Execute starts a go-waku node with settings determined by the Options parameter
|
||||
|
@ -112,32 +101,10 @@ func Execute(options Options) {
|
|||
logger := utils.Logger().With(logging.HostID("node", id))
|
||||
|
||||
var db *sql.DB
|
||||
var migrationFn func(*sql.DB) error
|
||||
if options.Store.Enable {
|
||||
dbURL := ""
|
||||
if options.Store.DatabaseURL != "" {
|
||||
err := validateDBUrl(options.Store.DatabaseURL)
|
||||
failOnErr(err, "connecting to the db")
|
||||
dbURL = options.Store.DatabaseURL
|
||||
} else {
|
||||
// In memoryDB
|
||||
dbURL = "sqlite://:memory:"
|
||||
}
|
||||
|
||||
// TODO: this should be refactored to use any kind of DB, not just sqlite
|
||||
// Extract to separate module
|
||||
|
||||
dbURLParts := strings.Split(dbURL, "://")
|
||||
dbEngine := dbURLParts[0]
|
||||
dbParams := dbURLParts[1]
|
||||
switch dbEngine {
|
||||
case "sqlite3":
|
||||
db, err = sqlite.NewDB(dbParams)
|
||||
failOnErr(err, "Could not connect to DB")
|
||||
logger.Info("using database: ", zap.String("path", dbParams))
|
||||
default:
|
||||
failOnErr(errors.New("unknown database engine"), fmt.Sprintf("%s is not supported by go-waku", dbEngine))
|
||||
}
|
||||
|
||||
db, migrationFn, err = extractDBAndMigration(options.Store.DatabaseURL)
|
||||
failOnErr(err, "Could not connect to DB")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -227,10 +194,13 @@ func Execute(options Options) {
|
|||
|
||||
if options.Store.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...))
|
||||
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime))
|
||||
dbStore, err := persistence.NewDBStore(logger,
|
||||
persistence.WithDB(db),
|
||||
persistence.WithMigrations(migrationFn),
|
||||
persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime),
|
||||
)
|
||||
failOnErr(err, "DBStore")
|
||||
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
||||
|
||||
}
|
||||
|
||||
if options.LightPush.Enable {
|
||||
|
|
|
@ -0,0 +1,367 @@
|
|||
// Code generated by go-bindata. DO NOT EDIT.
|
||||
// sources:
|
||||
// 1_messages.down.sql (124B)
|
||||
// 1_messages.up.sql (452B)
|
||||
// 2_messages_index.down.sql (60B)
|
||||
// 2_messages_index.up.sql (226B)
|
||||
// doc.go (74B)
|
||||
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func bindataRead(data []byte, name string) ([]byte, error) {
|
||||
gz, err := gzip.NewReader(bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read %q: %v", name, err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
_, err = io.Copy(&buf, gz)
|
||||
clErr := gz.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read %q: %v", name, err)
|
||||
}
|
||||
if clErr != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
type asset struct {
|
||||
bytes []byte
|
||||
info os.FileInfo
|
||||
digest [sha256.Size]byte
|
||||
}
|
||||
|
||||
type bindataFileInfo struct {
|
||||
name string
|
||||
size int64
|
||||
mode os.FileMode
|
||||
modTime time.Time
|
||||
}
|
||||
|
||||
func (fi bindataFileInfo) Name() string {
|
||||
return fi.name
|
||||
}
|
||||
func (fi bindataFileInfo) Size() int64 {
|
||||
return fi.size
|
||||
}
|
||||
func (fi bindataFileInfo) Mode() os.FileMode {
|
||||
return fi.mode
|
||||
}
|
||||
func (fi bindataFileInfo) ModTime() time.Time {
|
||||
return fi.modTime
|
||||
}
|
||||
func (fi bindataFileInfo) IsDir() bool {
|
||||
return false
|
||||
}
|
||||
func (fi bindataFileInfo) Sys() interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
var __1_messagesDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x2f\x4e\xcd\x4b\x49\x2d\x0a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\xb0\xe6\xc2\xab\xba\x28\x35\x39\x35\xb3\x0c\x53\x7d\x88\xa3\x93\x8f\x2b\xa6\x7a\x6b\x2e\x40\x00\x00\x00\xff\xff\xc2\x48\x8c\x05\x7c\x00\x00\x00")
|
||||
|
||||
func _1_messagesDownSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__1_messagesDownSql,
|
||||
"1_messages.down.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _1_messagesDownSql() (*asset, error) {
|
||||
bytes, err := _1_messagesDownSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xff, 0x4a, 0x8e, 0xa9, 0xd9, 0xa8, 0xa4, 0x73, 0x3a, 0x54, 0xe4, 0x35, 0xfd, 0xea, 0x87, 0x4c, 0xa, 0x5c, 0xc0, 0xc9, 0xe7, 0x8, 0x8c, 0x6f, 0x60, 0x9e, 0x54, 0x77, 0x59, 0xd0, 0x2b, 0xfe}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __1_messagesUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x90\x41\x4f\x83\x40\x10\x85\xcf\xec\xaf\x98\x23\x24\x1c\xbc\x73\x5a\xda\x69\x33\x11\x17\xb3\x4c\x93\x72\x32\x14\x26\x66\x13\x59\x08\x4b\x1b\xfd\xf7\x46\xad\x4a\x5a\x35\x9e\xbf\x37\x6f\xde\x7b\x2b\x8b\x9a\x11\x58\xe7\x05\x02\x6d\xc0\x94\x0c\xb8\xa7\x8a\x2b\xe8\x25\x84\xe6\x51\x20\x56\x91\xeb\x20\xaf\x19\x75\xaa\xa2\x49\x5a\x71\x27\x99\xd8\xf5\x12\xe6\xa6\x1f\x21\xa7\x2d\x19\x7e\xbf\x34\xbb\xa2\x48\x55\x14\xc4\x77\x7f\x2b\xda\xc1\xcf\xe2\x67\x1e\x46\xd7\x7e\x58\x2f\xe9\x78\x3c\x84\xe3\xe1\x37\xd8\xbc\x3c\x0d\xcd\x77\xa0\x93\x4c\xc1\x0d\x1e\xc8\x30\x6e\xd1\x7e\x49\x61\x8d\x1b\xbd\x2b\x18\x6e\x52\x15\xad\x4a\x53\xb1\xd5\x6f\x29\xce\xb5\xc8\x77\xf2\x0c\xf7\x96\xee\xb4\xad\xe1\x16\x6b\x88\x5d\x97\xc2\xe2\x75\xa2\x92\x4c\xa9\xf3\x40\x64\xd6\xb8\xff\x79\xa0\x87\xcb\xba\xa5\xf9\x44\xf1\x05\x4a\xb2\xff\xf8\x5d\x4f\xbc\x70\xbc\x82\x49\xf6\x1a\x00\x00\xff\xff\xa0\x46\xcd\x13\xc4\x01\x00\x00")
|
||||
|
||||
func _1_messagesUpSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__1_messagesUpSql,
|
||||
"1_messages.up.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _1_messagesUpSql() (*asset, error) {
|
||||
bytes, err := _1_messagesUpSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "1_messages.up.sql", size: 452, mode: os.FileMode(0664), modTime: time.Unix(1672853147, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe4, 0x17, 0xde, 0xd4, 0x55, 0x47, 0x7f, 0x61, 0xe6, 0xbd, 0x2e, 0x89, 0xb5, 0x7, 0xe1, 0x31, 0x1b, 0xd3, 0x20, 0x3d, 0x3e, 0x68, 0x54, 0xfe, 0xd3, 0x62, 0x51, 0x87, 0x5f, 0xbf, 0x57, 0x64}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __2_messages_indexDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x8c\xcf\x2d\x4e\x8f\x37\xb4\xe6\xc2\x23\x6b\x64\xcd\x05\x08\x00\x00\xff\xff\x53\x77\x9e\x4d\x3c\x00\x00\x00")
|
||||
|
||||
func _2_messages_indexDownSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__2_messages_indexDownSql,
|
||||
"2_messages_index.down.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _2_messages_indexDownSql() (*asset, error) {
|
||||
bytes, err := _2_messages_indexDownSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6e, 0xcb, 0x70, 0x82, 0x33, 0x13, 0x70, 0xd5, 0xbd, 0x3e, 0x68, 0x9, 0x4f, 0x78, 0xa9, 0xc, 0xd6, 0xf4, 0x64, 0xa0, 0x8c, 0xe4, 0x0, 0x15, 0x71, 0xf0, 0x5, 0xdb, 0xa6, 0xf2, 0x12, 0x60}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __2_messages_indexUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\xf0\xf3\x0f\x51\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x8c\xcf\x2d\x4e\x8f\x37\x54\xf0\xf7\x53\xc8\x4d\x2d\x2e\x4e\x4c\x4f\xd5\x48\xce\xcf\x2b\x49\xcd\x2b\x09\xc9\x2f\xc8\x4c\x56\x70\x0c\x76\xd6\x51\x28\x28\x4d\x2a\x2e\x4d\x42\x12\x28\x4e\xcd\x4b\x49\x2d\x0a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x80\x08\x66\xa6\x80\x68\x4d\x6b\x2e\x82\xd6\x19\xe1\xb4\xce\xc5\x15\xdd\x3e\x88\x08\xba\x85\x10\xd1\xcc\x14\x30\x43\xd3\x9a\x0b\x10\x00\x00\xff\xff\x2a\x3b\xab\xf4\xe2\x00\x00\x00")
|
||||
|
||||
func _2_messages_indexUpSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__2_messages_indexUpSql,
|
||||
"2_messages_index.up.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _2_messages_indexUpSql() (*asset, error) {
|
||||
bytes, err := _2_messages_indexUpSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xce, 0xb1, 0xc8, 0x2d, 0xa8, 0x6f, 0x83, 0xfb, 0xf2, 0x40, 0x30, 0xe9, 0xd, 0x18, 0x54, 0xe8, 0xf5, 0xf5, 0xc4, 0x5b, 0xf5, 0xa4, 0x94, 0x50, 0x56, 0x4a, 0xc8, 0x73, 0x3f, 0xf1, 0x56, 0xce}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
|
||||
|
||||
func docGoBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
_docGo,
|
||||
"doc.go",
|
||||
)
|
||||
}
|
||||
|
||||
func docGo() (*asset, error) {
|
||||
bytes, err := docGoBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// Asset loads and returns the asset for the given name.
|
||||
// It returns an error if the asset could not be found or
|
||||
// could not be loaded.
|
||||
func Asset(name string) ([]byte, error) {
|
||||
canonicalName := strings.Replace(name, "\\", "/", -1)
|
||||
if f, ok := _bindata[canonicalName]; ok {
|
||||
a, err := f()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
|
||||
}
|
||||
return a.bytes, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Asset %s not found", name)
|
||||
}
|
||||
|
||||
// AssetString returns the asset contents as a string (instead of a []byte).
|
||||
func AssetString(name string) (string, error) {
|
||||
data, err := Asset(name)
|
||||
return string(data), err
|
||||
}
|
||||
|
||||
// MustAsset is like Asset but panics when Asset would return an error.
|
||||
// It simplifies safe initialization of global variables.
|
||||
func MustAsset(name string) []byte {
|
||||
a, err := Asset(name)
|
||||
if err != nil {
|
||||
panic("asset: Asset(" + name + "): " + err.Error())
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// MustAssetString is like AssetString but panics when Asset would return an
|
||||
// error. It simplifies safe initialization of global variables.
|
||||
func MustAssetString(name string) string {
|
||||
return string(MustAsset(name))
|
||||
}
|
||||
|
||||
// AssetInfo loads and returns the asset info for the given name.
|
||||
// It returns an error if the asset could not be found or
|
||||
// could not be loaded.
|
||||
func AssetInfo(name string) (os.FileInfo, error) {
|
||||
canonicalName := strings.Replace(name, "\\", "/", -1)
|
||||
if f, ok := _bindata[canonicalName]; ok {
|
||||
a, err := f()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
|
||||
}
|
||||
return a.info, nil
|
||||
}
|
||||
return nil, fmt.Errorf("AssetInfo %s not found", name)
|
||||
}
|
||||
|
||||
// AssetDigest returns the digest of the file with the given name. It returns an
|
||||
// error if the asset could not be found or the digest could not be loaded.
|
||||
func AssetDigest(name string) ([sha256.Size]byte, error) {
|
||||
canonicalName := strings.Replace(name, "\\", "/", -1)
|
||||
if f, ok := _bindata[canonicalName]; ok {
|
||||
a, err := f()
|
||||
if err != nil {
|
||||
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err)
|
||||
}
|
||||
return a.digest, nil
|
||||
}
|
||||
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name)
|
||||
}
|
||||
|
||||
// Digests returns a map of all known files and their checksums.
|
||||
func Digests() (map[string][sha256.Size]byte, error) {
|
||||
mp := make(map[string][sha256.Size]byte, len(_bindata))
|
||||
for name := range _bindata {
|
||||
a, err := _bindata[name]()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp[name] = a.digest
|
||||
}
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
// AssetNames returns the names of the assets.
|
||||
func AssetNames() []string {
|
||||
names := make([]string, 0, len(_bindata))
|
||||
for name := range _bindata {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// _bindata is a table, holding each asset generator, mapped to its name.
|
||||
var _bindata = map[string]func() (*asset, error){
|
||||
"1_messages.down.sql": _1_messagesDownSql,
|
||||
|
||||
"1_messages.up.sql": _1_messagesUpSql,
|
||||
|
||||
"2_messages_index.down.sql": _2_messages_indexDownSql,
|
||||
|
||||
"2_messages_index.up.sql": _2_messages_indexUpSql,
|
||||
|
||||
"doc.go": docGo,
|
||||
}
|
||||
|
||||
// AssetDir returns the file names below a certain
|
||||
// directory embedded in the file by go-bindata.
|
||||
// For example if you run go-bindata on data/... and data contains the
|
||||
// following hierarchy:
|
||||
// data/
|
||||
// foo.txt
|
||||
// img/
|
||||
// a.png
|
||||
// b.png
|
||||
// then AssetDir("data") would return []string{"foo.txt", "img"},
|
||||
// AssetDir("data/img") would return []string{"a.png", "b.png"},
|
||||
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
|
||||
// AssetDir("") will return []string{"data"}.
|
||||
func AssetDir(name string) ([]string, error) {
|
||||
node := _bintree
|
||||
if len(name) != 0 {
|
||||
canonicalName := strings.Replace(name, "\\", "/", -1)
|
||||
pathList := strings.Split(canonicalName, "/")
|
||||
for _, p := range pathList {
|
||||
node = node.Children[p]
|
||||
if node == nil {
|
||||
return nil, fmt.Errorf("Asset %s not found", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
if node.Func != nil {
|
||||
return nil, fmt.Errorf("Asset %s not found", name)
|
||||
}
|
||||
rv := make([]string, 0, len(node.Children))
|
||||
for childName := range node.Children {
|
||||
rv = append(rv, childName)
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
type bintree struct {
|
||||
Func func() (*asset, error)
|
||||
Children map[string]*bintree
|
||||
}
|
||||
|
||||
var _bintree = &bintree{nil, map[string]*bintree{
|
||||
"1_messages.down.sql": &bintree{_1_messagesDownSql, map[string]*bintree{}},
|
||||
"1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}},
|
||||
"2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}},
|
||||
"2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}},
|
||||
"doc.go": &bintree{docGo, map[string]*bintree{}},
|
||||
}}
|
||||
|
||||
// RestoreAsset restores an asset under the given directory.
|
||||
func RestoreAsset(dir, name string) error {
|
||||
data, err := Asset(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
info, err := AssetInfo(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
|
||||
}
|
||||
|
||||
// RestoreAssets restores an asset under the given directory recursively.
|
||||
func RestoreAssets(dir, name string) error {
|
||||
children, err := AssetDir(name)
|
||||
// File
|
||||
if err != nil {
|
||||
return RestoreAsset(dir, name)
|
||||
}
|
||||
// Dir
|
||||
for _, child := range children {
|
||||
err = RestoreAssets(dir, filepath.Join(name, child))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func _filePath(dir, name string) string {
|
||||
canonicalName := strings.Replace(name, "\\", "/", -1)
|
||||
return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...)
|
||||
}
|
|
@ -7,38 +7,30 @@ import (
|
|||
"database/sql"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database/sqlite3"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
|
||||
bindata "github.com/golang-migrate/migrate/v4/source/go_bindata"
|
||||
)
|
||||
|
||||
// Migrate applies migrations.
|
||||
func Migrate(db *sql.DB) error {
|
||||
func Migrate(db *sql.DB, driver database.Driver) error {
|
||||
return migrateDB(db, bindata.Resource(
|
||||
AssetNames(),
|
||||
func(name string) ([]byte, error) {
|
||||
return Asset(name)
|
||||
},
|
||||
))
|
||||
Asset,
|
||||
), driver)
|
||||
}
|
||||
|
||||
// Migrate database using provided resources.
|
||||
func migrateDB(db *sql.DB, resources *bindata.AssetSource) error {
|
||||
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error {
|
||||
source, err := bindata.WithInstance(resources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{
|
||||
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := migrate.NewWithInstance(
|
||||
"go-bindata",
|
||||
source,
|
||||
"sqlite",
|
||||
"gowakudb",
|
||||
driver)
|
||||
if err != nil {
|
||||
return err
|
|
@ -0,0 +1,13 @@
|
|||
CREATE TABLE IF NOT EXISTS message (
|
||||
id BYTEA,
|
||||
receiverTimestamp BIGINT NOT NULL,
|
||||
senderTimestamp BIGINT NOT NULL,
|
||||
contentTopic BYTEA NOT NULL,
|
||||
pubsubTopic BYTEA NOT NULL,
|
||||
payload BYTEA,
|
||||
version INTEGER NOT NULL DEFAULT 0,
|
||||
CONSTRAINT messageIndex PRIMARY KEY (id, pubsubTopic)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS message_senderTimestamp ON message(senderTimestamp);
|
||||
CREATE INDEX IF NOT EXISTS message_receiverTimestamp ON message(receiverTimestamp);
|
|
@ -0,0 +1,146 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/postgres/migrations"
|
||||
)
|
||||
|
||||
// Queries are the postgresql queries for a given table.
|
||||
type Queries struct {
|
||||
deleteQuery string
|
||||
existsQuery string
|
||||
getQuery string
|
||||
putQuery string
|
||||
queryQuery string
|
||||
prefixQuery string
|
||||
limitQuery string
|
||||
offsetQuery string
|
||||
getSizeQuery string
|
||||
}
|
||||
|
||||
// NewQueries creates a new Postgresql set of queries for the passed table
|
||||
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
|
||||
err := CreateTable(db, tbl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Queries{
|
||||
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
|
||||
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
|
||||
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
|
||||
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
|
||||
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
|
||||
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
|
||||
limitQuery: ` LIMIT %d`,
|
||||
offsetQuery: ` OFFSET %d`,
|
||||
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete returns the query for deleting a row.
|
||||
func (q Queries) Delete() string {
|
||||
return q.deleteQuery
|
||||
}
|
||||
|
||||
// Exists returns the query for determining if a row exists.
|
||||
func (q Queries) Exists() string {
|
||||
return q.existsQuery
|
||||
}
|
||||
|
||||
// Get returns the query for getting a row.
|
||||
func (q Queries) Get() string {
|
||||
return q.getQuery
|
||||
}
|
||||
|
||||
// Put returns the query for putting a row.
|
||||
func (q Queries) Put() string {
|
||||
return q.putQuery
|
||||
}
|
||||
|
||||
// Query returns the query for getting multiple rows.
|
||||
func (q Queries) Query() string {
|
||||
return q.queryQuery
|
||||
}
|
||||
|
||||
// Prefix returns the query fragment for getting a rows with a key prefix.
|
||||
func (q Queries) Prefix() string {
|
||||
return q.prefixQuery
|
||||
}
|
||||
|
||||
// Limit returns the query fragment for limiting results.
|
||||
func (q Queries) Limit() string {
|
||||
return q.limitQuery
|
||||
}
|
||||
|
||||
// Offset returns the query fragment for returning rows from a given offset.
|
||||
func (q Queries) Offset() string {
|
||||
return q.offsetQuery
|
||||
}
|
||||
|
||||
// GetSize returns the query for determining the size of a value.
|
||||
func (q Queries) GetSize() string {
|
||||
return q.getSizeQuery
|
||||
}
|
||||
|
||||
// WithDB is a DBOption that lets you use a postgresql DBStore and run migrations
|
||||
func WithDB(dburl string, migrate bool) persistence.DBOption {
|
||||
return func(d *persistence.DBStore) error {
|
||||
driverOption := persistence.WithDriver("postgres", dburl)
|
||||
err := driverOption(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !migrate {
|
||||
return nil
|
||||
}
|
||||
|
||||
migrationOpt := persistence.WithMigrations(Migrate)
|
||||
err = migrationOpt(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewDB connects to postgres DB in the specified path
|
||||
func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
|
||||
db, err := sql.Open("postgres", dburl)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return db, Migrate, nil
|
||||
}
|
||||
|
||||
func migrationDriver(db *sql.DB) (database.Driver, error) {
|
||||
return postgres.WithInstance(db, &postgres.Config{
|
||||
MigrationsTable: "gowaku_" + postgres.DefaultMigrationsTable,
|
||||
})
|
||||
}
|
||||
|
||||
// CreateTable creates the table that will persist the peers
|
||||
func CreateTable(db *sql.DB, tableName string) error {
|
||||
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
|
||||
_, err := db.Exec(sqlStmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
}
|
|
@ -88,7 +88,7 @@ func _1_messagesDownSql() (*asset, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)}
|
||||
info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xff, 0x4a, 0x8e, 0xa9, 0xd9, 0xa8, 0xa4, 0x73, 0x3a, 0x54, 0xe4, 0x35, 0xfd, 0xea, 0x87, 0x4c, 0xa, 0x5c, 0xc0, 0xc9, 0xe7, 0x8, 0x8c, 0x6f, 0x60, 0x9e, 0x54, 0x77, 0x59, 0xd0, 0x2b, 0xfe}}
|
||||
return a, nil
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func _1_messagesUpSql() (*asset, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "1_messages.up.sql", size: 464, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)}
|
||||
info := bindataFileInfo{name: "1_messages.up.sql", size: 464, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4, 0xd8, 0x47, 0x7b, 0xe, 0x47, 0x2a, 0x4b, 0x48, 0x36, 0x23, 0x93, 0x28, 0xb3, 0x1e, 0x5, 0x76, 0x64, 0x73, 0xb, 0x2b, 0x5b, 0x10, 0x62, 0x36, 0x21, 0x6f, 0xa3, 0x3c, 0xdd, 0xe2, 0xcf}}
|
||||
return a, nil
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ func _2_messages_indexDownSql() (*asset, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1664827710, 0)}
|
||||
info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6e, 0xcb, 0x70, 0x82, 0x33, 0x13, 0x70, 0xd5, 0xbd, 0x3e, 0x68, 0x9, 0x4f, 0x78, 0xa9, 0xc, 0xd6, 0xf4, 0x64, 0xa0, 0x8c, 0xe4, 0x0, 0x15, 0x71, 0xf0, 0x5, 0xdb, 0xa6, 0xf2, 0x12, 0x60}}
|
||||
return a, nil
|
||||
}
|
||||
|
@ -148,7 +148,7 @@ func _2_messages_indexUpSql() (*asset, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1664827710, 0)}
|
||||
info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xce, 0xb1, 0xc8, 0x2d, 0xa8, 0x6f, 0x83, 0xfb, 0xf2, 0x40, 0x30, 0xe9, 0xd, 0x18, 0x54, 0xe8, 0xf5, 0xf5, 0xc4, 0x5b, 0xf5, 0xa4, 0x94, 0x50, 0x56, 0x4a, 0xc8, 0x73, 0x3f, 0xf1, 0x56, 0xce}}
|
||||
return a, nil
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ func docGo() (*asset, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)}
|
||||
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}}
|
||||
return a, nil
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
//go:build !gowaku_skip_migrations
|
||||
// +build !gowaku_skip_migrations
|
||||
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
|
||||
bindata "github.com/golang-migrate/migrate/v4/source/go_bindata"
|
||||
)
|
||||
|
||||
// Migrate applies migrations.
|
||||
func Migrate(db *sql.DB, driver database.Driver) error {
|
||||
return migrateDB(db, bindata.Resource(
|
||||
AssetNames(),
|
||||
Asset,
|
||||
), driver)
|
||||
}
|
||||
|
||||
// Migrate database using provided resources.
|
||||
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error {
|
||||
source, err := bindata.WithInstance(resources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := migrate.NewWithInstance(
|
||||
"go-bindata",
|
||||
source,
|
||||
"gowakudb",
|
||||
driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = m.Up(); err != migrate.ErrNoChange {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
//go:build gowaku_skip_migrations
|
||||
// +build gowaku_skip_migrations
|
||||
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// Skip migration code
|
||||
func Migrate(db *sql.DB) error {
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
DROP INDEX IF EXISTS message_senderTimestamp;
|
||||
DROP INDEX IF EXISTS message_receiverTimestamp;
|
||||
DROP TABLE IF EXISTS message;
|
|
@ -0,0 +1,2 @@
|
|||
DROP INDEX IF EXISTS i_msg_1;
|
||||
DROP INDEX IF EXISTS i_msg_2;
|
|
@ -0,0 +1,2 @@
|
|||
CREATE INDEX IF NOT EXISTS i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC);
|
||||
CREATE INDEX IF NOT EXISTS i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC);
|
|
@ -0,0 +1,3 @@
|
|||
package sql
|
||||
|
||||
//go:generate go-bindata -pkg migrations -o ../bindata.go ./
|
|
@ -5,8 +5,11 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/golang-migrate/migrate/v4/database/sqlite3"
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
)
|
||||
|
||||
// Queries are the sqlite queries for a given table.
|
||||
|
@ -102,25 +105,43 @@ func addSqliteURLDefaults(dburl string) string {
|
|||
return dburl
|
||||
}
|
||||
|
||||
// WithDB is a DBOption that lets you use a sqlite3 DBStore.
|
||||
func WithDB(dburl string) persistence.DBOption {
|
||||
return persistence.WithDriver("sqlite3", addSqliteURLDefaults(dburl), persistence.ConnectionPoolOptions{
|
||||
// Disable concurrent access as not supported by the driver
|
||||
MaxOpenConnections: 1,
|
||||
})
|
||||
// WithDB is a DBOption that lets you use a sqlite3 DBStore and run migrations
|
||||
func WithDB(dburl string, migrate bool) persistence.DBOption {
|
||||
return func(d *persistence.DBStore) error {
|
||||
driverOption := persistence.WithDriver("sqlite3", addSqliteURLDefaults(dburl), persistence.ConnectionPoolOptions{
|
||||
// Disable concurrent access as not supported by the driver
|
||||
MaxOpenConnections: 1,
|
||||
})
|
||||
err := driverOption(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !migrate {
|
||||
return nil
|
||||
}
|
||||
|
||||
migrationOpt := persistence.WithMigrations(Migrate)
|
||||
err = migrationOpt(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewDB creates a sqlite3 DB in the specified path
|
||||
func NewDB(dburl string) (*sql.DB, error) {
|
||||
func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
|
||||
db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Disable concurrent access as not supported by the driver
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
return db, nil
|
||||
return db, Migrate, nil
|
||||
}
|
||||
|
||||
// CreateTable creates the table that will persist the peers
|
||||
|
@ -132,3 +153,17 @@ func CreateTable(db *sql.DB, tableName string) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrationDriver(db *sql.DB) (database.Driver, error) {
|
||||
return sqlite3.WithInstance(db, &sqlite3.Config{
|
||||
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
|
||||
})
|
||||
}
|
||||
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/persistence/migrations"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
|
@ -33,7 +32,10 @@ const WALMode = "wal"
|
|||
// DBStore is a MessageProvider that has a *sql.DB connection
|
||||
type DBStore struct {
|
||||
MessageProvider
|
||||
db *sql.DB
|
||||
|
||||
db *sql.DB
|
||||
migrationFn func(db *sql.DB) error
|
||||
|
||||
timesource timesource.Timesource
|
||||
log *zap.Logger
|
||||
|
||||
|
@ -101,19 +103,18 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithMigrationsEnabled is a DBOption used to determine whether migrations should
|
||||
// be executed or not
|
||||
func WithMigrationsEnabled(enabled bool) DBOption {
|
||||
// WithMigrations is a DBOption used to determine if migrations should
|
||||
// be executed, and what driver to use
|
||||
func WithMigrations(migrationFn func(db *sql.DB) error) DBOption {
|
||||
return func(d *DBStore) error {
|
||||
d.enableMigrations = enabled
|
||||
d.enableMigrations = true
|
||||
d.migrationFn = migrationFn
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []DBOption {
|
||||
return []DBOption{
|
||||
WithMigrationsEnabled(true),
|
||||
}
|
||||
return []DBOption{}
|
||||
}
|
||||
|
||||
// Creates a new DB store using the db specified via options.
|
||||
|
@ -135,7 +136,7 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
|
|||
}
|
||||
|
||||
if result.enableMigrations {
|
||||
err := migrations.Migrate(result.db)
|
||||
err := result.migrationFn(result.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -164,7 +165,7 @@ func (d *DBStore) cleanOlderRecords() error {
|
|||
// Delete older messages
|
||||
if d.maxDuration > 0 {
|
||||
start := time.Now()
|
||||
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
|
||||
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1`
|
||||
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration)))
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -176,7 +177,7 @@ func (d *DBStore) cleanOlderRecords() error {
|
|||
// Limit number of records to a max N
|
||||
if d.maxMessages > 0 {
|
||||
start := time.Now()
|
||||
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?)`
|
||||
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET $1)`
|
||||
_, err := d.db.Exec(sqlStmt, d.maxMessages)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -218,7 +219,7 @@ func (d *DBStore) Stop() {
|
|||
|
||||
// Put inserts a WakuMessage into the DB
|
||||
func (d *DBStore) Put(env *protocol.Envelope) error {
|
||||
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)")
|
||||
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -249,14 +250,15 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
|
||||
FROM message
|
||||
%s
|
||||
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s
|
||||
LIMIT ?`
|
||||
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
|
||||
|
||||
var conditions []string
|
||||
var parameters []interface{}
|
||||
paramCnt := 0
|
||||
|
||||
if query.PubsubTopic != "" {
|
||||
conditions = append(conditions, "pubsubTopic = ?")
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("pubsubTopic = $%d", paramCnt))
|
||||
parameters = append(parameters, query.PubsubTopic)
|
||||
}
|
||||
|
||||
|
@ -264,7 +266,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
var ctPlaceHolder []string
|
||||
for _, ct := range query.ContentFilters {
|
||||
if ct.ContentTopic != "" {
|
||||
ctPlaceHolder = append(ctPlaceHolder, "?")
|
||||
paramCnt++
|
||||
ctPlaceHolder = append(ctPlaceHolder, fmt.Sprintf("$%d", paramCnt))
|
||||
parameters = append(parameters, ct.ContentTopic)
|
||||
}
|
||||
}
|
||||
|
@ -277,7 +280,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
var exists bool
|
||||
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
|
||||
|
||||
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)",
|
||||
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)",
|
||||
cursorDBKey.Bytes(),
|
||||
).Scan(&exists)
|
||||
|
||||
|
@ -290,7 +293,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
eqOp = "<"
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("id %s ?", eqOp))
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, paramCnt))
|
||||
|
||||
parameters = append(parameters, cursorDBKey.Bytes())
|
||||
} else {
|
||||
|
@ -300,7 +304,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
|
||||
if query.StartTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
conditions = append(conditions, "id >= ?")
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt))
|
||||
startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{})
|
||||
parameters = append(parameters, startTimeDBKey.Bytes())
|
||||
}
|
||||
|
@ -309,7 +314,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
|
||||
if query.EndTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD {
|
||||
conditions = append(conditions, "id <= ?")
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt))
|
||||
endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{})
|
||||
parameters = append(parameters, endTimeDBKey.Bytes())
|
||||
}
|
||||
|
@ -325,6 +331,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
|||
orderDirection = "DESC"
|
||||
}
|
||||
|
||||
paramCnt++
|
||||
sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt)
|
||||
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
|
||||
|
||||
stmt, err := d.db.Prepare(sqlQuery)
|
||||
|
|
|
@ -5,15 +5,27 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4/database/sqlite3"
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := sqlite3.WithInstance(db, &sqlite3.Config{
|
||||
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
}
|
||||
|
||||
func NewMock() *sql.DB {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
|
@ -25,8 +37,7 @@ func NewMock() *sql.DB {
|
|||
|
||||
func TestDbStore(t *testing.T) {
|
||||
db := NewMock()
|
||||
option := WithDB(db)
|
||||
store, err := NewDBStore(utils.Logger(), option)
|
||||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store.Start(timesource.NewDefaultClock())
|
||||
|
@ -46,7 +57,7 @@ func TestDbStore(t *testing.T) {
|
|||
|
||||
func TestStoreRetention(t *testing.T) {
|
||||
db := NewMock()
|
||||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second))
|
||||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store.Start(timesource.NewDefaultClock())
|
||||
|
|
|
@ -69,9 +69,9 @@ func TestConnectionStatusChanges(t *testing.T) {
|
|||
err = node2.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err := sqlite.NewDB(":memory:")
|
||||
db, migration, err := sqlite.NewDB(":memory:")
|
||||
require.NoError(t, err)
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Node3: Relay + Store
|
||||
|
|
|
@ -182,9 +182,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||
defer wakuNode1.Stop()
|
||||
|
||||
// NODE2: Filter Client/Store
|
||||
db, err := sqlite.NewDB(":memory:")
|
||||
db, migration, err := sqlite.NewDB(":memory:")
|
||||
require.NoError(t, err)
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
|
||||
require.NoError(t, err)
|
||||
|
||||
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
|
||||
|
|
|
@ -219,30 +219,30 @@ func (m *PeerExchangeRPC) GetResponse() *PeerExchangeResponse {
|
|||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*PeerInfo)(nil), "PeerInfo")
|
||||
proto.RegisterType((*PeerExchangeQuery)(nil), "PeerExchangeQuery")
|
||||
proto.RegisterType((*PeerExchangeResponse)(nil), "PeerExchangeResponse")
|
||||
proto.RegisterType((*PeerExchangeRPC)(nil), "PeerExchangeRPC")
|
||||
proto.RegisterType((*PeerInfo)(nil), "pb.PeerInfo")
|
||||
proto.RegisterType((*PeerExchangeQuery)(nil), "pb.PeerExchangeQuery")
|
||||
proto.RegisterType((*PeerExchangeResponse)(nil), "pb.PeerExchangeResponse")
|
||||
proto.RegisterType((*PeerExchangeRPC)(nil), "pb.PeerExchangeRPC")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("waku_peer_exchange.proto", fileDescriptor_ce50192ba54b780f) }
|
||||
|
||||
var fileDescriptor_ce50192ba54b780f = []byte{
|
||||
// 211 bytes of a gzipped FileDescriptorProto
|
||||
// 220 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4f, 0xcc, 0x2e,
|
||||
0x8d, 0x2f, 0x48, 0x4d, 0x2d, 0x8a, 0x4f, 0xad, 0x48, 0xce, 0x48, 0xcc, 0x4b, 0x4f, 0xd5, 0x2b,
|
||||
0x28, 0xca, 0x2f, 0xc9, 0x57, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, 0xf2, 0xcc, 0x4b, 0xcb,
|
||||
0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31,
|
||||
0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x3d, 0x81, 0xa5, 0xa9, 0x45, 0x95, 0x42, 0x52,
|
||||
0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, 0x38, 0x5f, 0xc9, 0x9e,
|
||||
0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x3a, 0x17, 0x67,
|
||||
0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x4e, 0x3d, 0x98, 0xc5, 0x41, 0x08, 0x39,
|
||||
0xa5, 0x3c, 0x2e, 0x7e, 0x14, 0x03, 0x02, 0x9c, 0x85, 0x34, 0xb8, 0x58, 0x0b, 0x41, 0x16, 0x83,
|
||||
0x2d, 0xe3, 0x36, 0x12, 0xd2, 0xc3, 0x70, 0x52, 0x10, 0x44, 0x81, 0x90, 0x21, 0x17, 0x47, 0x11,
|
||||
0xd4, 0x46, 0x09, 0x26, 0xb0, 0x62, 0x51, 0x3d, 0x6c, 0xce, 0x09, 0x82, 0x2b, 0x73, 0x12, 0x38,
|
||||
0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63,
|
||||
0x48, 0x62, 0x03, 0x07, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x99, 0xd5, 0xbb, 0xb6, 0x34,
|
||||
0x01, 0x00, 0x00,
|
||||
0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d,
|
||||
0xf2, 0xcc, 0x4b, 0xcb, 0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4,
|
||||
0xe0, 0x09, 0x02, 0x31, 0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x7d, 0x81, 0xa5, 0xa9,
|
||||
0x45, 0x95, 0x42, 0x52, 0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20,
|
||||
0x38, 0x5f, 0xc9, 0x89, 0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa,
|
||||
0x90, 0x16, 0x17, 0x67, 0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x1e, 0xbd, 0x82,
|
||||
0x24, 0x3d, 0x98, 0xdd, 0x41, 0x08, 0x69, 0xa5, 0x12, 0x2e, 0x7e, 0x14, 0x33, 0x02, 0x9c, 0x85,
|
||||
0xb4, 0xb9, 0x58, 0x0b, 0x41, 0x76, 0x83, 0xed, 0xe3, 0x36, 0x12, 0x85, 0x69, 0x45, 0x71, 0x58,
|
||||
0x10, 0x44, 0x8d, 0x90, 0x09, 0x17, 0x47, 0x11, 0xd4, 0x5e, 0x09, 0x26, 0xb0, 0x7a, 0x09, 0x74,
|
||||
0xf5, 0x30, 0x77, 0x05, 0xc1, 0x55, 0x3a, 0x09, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c,
|
||||
0xe3, 0x83, 0x47, 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x43, 0xc9, 0x18, 0x10,
|
||||
0x00, 0x00, 0xff, 0xff, 0x25, 0x86, 0x8e, 0x1d, 0x41, 0x01, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *PeerInfo) Marshal() (dAtA []byte, err error) {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package pb;
|
||||
|
||||
message PeerInfo {
|
||||
bytes ENR = 1;
|
||||
}
|
||||
|
|
|
@ -12,10 +12,10 @@ import (
|
|||
|
||||
func MemoryDB(t *testing.T) *persistence.DBStore {
|
||||
var db *sql.DB
|
||||
db, err := sqlite.NewDB(":memory:")
|
||||
db, migration, err := sqlite.NewDB(":memory:")
|
||||
require.NoError(t, err)
|
||||
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
|
||||
require.NoError(t, err)
|
||||
|
||||
return dbStore
|
||||
|
|
Loading…
Reference in New Issue