diff --git a/Makefile b/Makefile index 79968a68..d1c6505c 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,8 @@ test-ci: _before-cc test _after-cc generate: ${GOBIN} generate ./waku/v2/protocol/pb/generate.go - ${GOBIN} generate ./waku/persistence/migrations/sql + ${GOBIN} generate ./waku/persistence/sqlite/migrations/sql + ${GOBIN} generate ./waku/persistence/postgres/migrations/sql ${GOBIN} generate ./waku/v2/protocol/rln/contracts/generate.go ${GOBIN} generate ./waku/v2/protocol/rln/doc.go diff --git a/go.mod b/go.mod index 31a87e3c..a5d84ae7 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,10 @@ require ( golang.org/x/text v0.4.0 ) -require github.com/waku-org/go-noise v0.0.4 +require ( + github.com/lib/pq v1.10.0 + github.com/waku-org/go-noise v0.0.4 +) require ( github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect diff --git a/waku/db.go b/waku/db.go new file mode 100644 index 00000000..42b96442 --- /dev/null +++ b/waku/db.go @@ -0,0 +1,56 @@ +package waku + +import ( + "database/sql" + "errors" + "regexp" + "strings" + + "github.com/waku-org/go-waku/waku/persistence/postgres" + "github.com/waku-org/go-waku/waku/persistence/sqlite" +) + +func validateDBUrl(val string) error { + matched, err := regexp.Match(`^[\w\+]+:\/\/[\w\/\\\.\:\@]+\?{0,1}.*$`, []byte(val)) + if !matched || err != nil { + return errors.New("invalid db url option format") + } + return nil +} + +func extractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) { + var db *sql.DB + var migrationFn func(*sql.DB) error + var err error + + dbURL := "" + if databaseURL != "" { + err := validateDBUrl(databaseURL) + if err != nil { + return nil, nil, err + } + dbURL = databaseURL + } else { + // In memoryDB + dbURL = "sqlite://:memory:" + } + + dbURLParts := strings.Split(dbURL, "://") + dbEngine := dbURLParts[0] + dbParams := dbURLParts[1] + switch dbEngine { + case "sqlite3": + db, migrationFn, err = sqlite.NewDB(dbParams) + case "postgresql": + db, migrationFn, err = postgres.NewDB(dbURL) + default: + err = errors.New("unsupported database engine") + } + + if err != nil { + return nil, nil, err + } + + return db, migrationFn, nil + +} diff --git a/waku/node.go b/waku/node.go index 0599bbe3..bb2f0122 100644 --- a/waku/node.go +++ b/waku/node.go @@ -5,14 +5,11 @@ import ( "crypto/ecdsa" "database/sql" "encoding/json" - "errors" "fmt" "io/ioutil" "net" "os" "os/signal" - "regexp" - "strings" "sync" "syscall" "time" @@ -79,14 +76,6 @@ func freePort() (int, error) { return port, nil } -func validateDBUrl(val string) error { - matched, err := regexp.Match(`^[\w\+]+:\/\/[\w\/\\\.\:\@]+\?{0,1}.*$`, []byte(val)) - if !matched || err != nil { - return errors.New("invalid db url option format") - } - return nil -} - const dialTimeout = 7 * time.Second // Execute starts a go-waku node with settings determined by the Options parameter @@ -112,32 +101,10 @@ func Execute(options Options) { logger := utils.Logger().With(logging.HostID("node", id)) var db *sql.DB + var migrationFn func(*sql.DB) error if options.Store.Enable { - dbURL := "" - if options.Store.DatabaseURL != "" { - err := validateDBUrl(options.Store.DatabaseURL) - failOnErr(err, "connecting to the db") - dbURL = options.Store.DatabaseURL - } else { - // In memoryDB - dbURL = "sqlite://:memory:" - } - - // TODO: this should be refactored to use any kind of DB, not just sqlite - // Extract to separate module - - dbURLParts := strings.Split(dbURL, "://") - dbEngine := dbURLParts[0] - dbParams := dbURLParts[1] - switch dbEngine { - case "sqlite3": - db, err = sqlite.NewDB(dbParams) - failOnErr(err, "Could not connect to DB") - logger.Info("using database: ", zap.String("path", dbParams)) - default: - failOnErr(errors.New("unknown database engine"), fmt.Sprintf("%s is not supported by go-waku", dbEngine)) - } - + db, migrationFn, err = extractDBAndMigration(options.Store.DatabaseURL) + failOnErr(err, "Could not connect to DB") } ctx := context.Background() @@ -227,10 +194,13 @@ func Execute(options Options) { if options.Store.Enable { nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...)) - dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime)) + dbStore, err := persistence.NewDBStore(logger, + persistence.WithDB(db), + persistence.WithMigrations(migrationFn), + persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime), + ) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) - } if options.LightPush.Enable { diff --git a/waku/persistence/postgres/migrations/bindata.go b/waku/persistence/postgres/migrations/bindata.go new file mode 100644 index 00000000..7651f0cd --- /dev/null +++ b/waku/persistence/postgres/migrations/bindata.go @@ -0,0 +1,367 @@ +// Code generated by go-bindata. DO NOT EDIT. +// sources: +// 1_messages.down.sql (124B) +// 1_messages.up.sql (452B) +// 2_messages_index.down.sql (60B) +// 2_messages_index.up.sql (226B) +// doc.go (74B) + +package migrations + +import ( + "bytes" + "compress/gzip" + "crypto/sha256" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" +) + +func bindataRead(data []byte, name string) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return nil, fmt.Errorf("read %q: %v", name, err) + } + + var buf bytes.Buffer + _, err = io.Copy(&buf, gz) + clErr := gz.Close() + + if err != nil { + return nil, fmt.Errorf("read %q: %v", name, err) + } + if clErr != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +type asset struct { + bytes []byte + info os.FileInfo + digest [sha256.Size]byte +} + +type bindataFileInfo struct { + name string + size int64 + mode os.FileMode + modTime time.Time +} + +func (fi bindataFileInfo) Name() string { + return fi.name +} +func (fi bindataFileInfo) Size() int64 { + return fi.size +} +func (fi bindataFileInfo) Mode() os.FileMode { + return fi.mode +} +func (fi bindataFileInfo) ModTime() time.Time { + return fi.modTime +} +func (fi bindataFileInfo) IsDir() bool { + return false +} +func (fi bindataFileInfo) Sys() interface{} { + return nil +} + +var __1_messagesDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x2f\x4e\xcd\x4b\x49\x2d\x0a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\xb0\xe6\xc2\xab\xba\x28\x35\x39\x35\xb3\x0c\x53\x7d\x88\xa3\x93\x8f\x2b\xa6\x7a\x6b\x2e\x40\x00\x00\x00\xff\xff\xc2\x48\x8c\x05\x7c\x00\x00\x00") + +func _1_messagesDownSqlBytes() ([]byte, error) { + return bindataRead( + __1_messagesDownSql, + "1_messages.down.sql", + ) +} + +func _1_messagesDownSql() (*asset, error) { + bytes, err := _1_messagesDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xff, 0x4a, 0x8e, 0xa9, 0xd9, 0xa8, 0xa4, 0x73, 0x3a, 0x54, 0xe4, 0x35, 0xfd, 0xea, 0x87, 0x4c, 0xa, 0x5c, 0xc0, 0xc9, 0xe7, 0x8, 0x8c, 0x6f, 0x60, 0x9e, 0x54, 0x77, 0x59, 0xd0, 0x2b, 0xfe}} + return a, nil +} + +var __1_messagesUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x90\x41\x4f\x83\x40\x10\x85\xcf\xec\xaf\x98\x23\x24\x1c\xbc\x73\x5a\xda\x69\x33\x11\x17\xb3\x4c\x93\x72\x32\x14\x26\x66\x13\x59\x08\x4b\x1b\xfd\xf7\x46\xad\x4a\x5a\x35\x9e\xbf\x37\x6f\xde\x7b\x2b\x8b\x9a\x11\x58\xe7\x05\x02\x6d\xc0\x94\x0c\xb8\xa7\x8a\x2b\xe8\x25\x84\xe6\x51\x20\x56\x91\xeb\x20\xaf\x19\x75\xaa\xa2\x49\x5a\x71\x27\x99\xd8\xf5\x12\xe6\xa6\x1f\x21\xa7\x2d\x19\x7e\xbf\x34\xbb\xa2\x48\x55\x14\xc4\x77\x7f\x2b\xda\xc1\xcf\xe2\x67\x1e\x46\xd7\x7e\x58\x2f\xe9\x78\x3c\x84\xe3\xe1\x37\xd8\xbc\x3c\x0d\xcd\x77\xa0\x93\x4c\xc1\x0d\x1e\xc8\x30\x6e\xd1\x7e\x49\x61\x8d\x1b\xbd\x2b\x18\x6e\x52\x15\xad\x4a\x53\xb1\xd5\x6f\x29\xce\xb5\xc8\x77\xf2\x0c\xf7\x96\xee\xb4\xad\xe1\x16\x6b\x88\x5d\x97\xc2\xe2\x75\xa2\x92\x4c\xa9\xf3\x40\x64\xd6\xb8\xff\x79\xa0\x87\xcb\xba\xa5\xf9\x44\xf1\x05\x4a\xb2\xff\xf8\x5d\x4f\xbc\x70\xbc\x82\x49\xf6\x1a\x00\x00\xff\xff\xa0\x46\xcd\x13\xc4\x01\x00\x00") + +func _1_messagesUpSqlBytes() ([]byte, error) { + return bindataRead( + __1_messagesUpSql, + "1_messages.up.sql", + ) +} + +func _1_messagesUpSql() (*asset, error) { + bytes, err := _1_messagesUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1_messages.up.sql", size: 452, mode: os.FileMode(0664), modTime: time.Unix(1672853147, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe4, 0x17, 0xde, 0xd4, 0x55, 0x47, 0x7f, 0x61, 0xe6, 0xbd, 0x2e, 0x89, 0xb5, 0x7, 0xe1, 0x31, 0x1b, 0xd3, 0x20, 0x3d, 0x3e, 0x68, 0x54, 0xfe, 0xd3, 0x62, 0x51, 0x87, 0x5f, 0xbf, 0x57, 0x64}} + return a, nil +} + +var __2_messages_indexDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x8c\xcf\x2d\x4e\x8f\x37\xb4\xe6\xc2\x23\x6b\x64\xcd\x05\x08\x00\x00\xff\xff\x53\x77\x9e\x4d\x3c\x00\x00\x00") + +func _2_messages_indexDownSqlBytes() ([]byte, error) { + return bindataRead( + __2_messages_indexDownSql, + "2_messages_index.down.sql", + ) +} + +func _2_messages_indexDownSql() (*asset, error) { + bytes, err := _2_messages_indexDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6e, 0xcb, 0x70, 0x82, 0x33, 0x13, 0x70, 0xd5, 0xbd, 0x3e, 0x68, 0x9, 0x4f, 0x78, 0xa9, 0xc, 0xd6, 0xf4, 0x64, 0xa0, 0x8c, 0xe4, 0x0, 0x15, 0x71, 0xf0, 0x5, 0xdb, 0xa6, 0xf2, 0x12, 0x60}} + return a, nil +} + +var __2_messages_indexUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\xf0\xf3\x0f\x51\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x8c\xcf\x2d\x4e\x8f\x37\x54\xf0\xf7\x53\xc8\x4d\x2d\x2e\x4e\x4c\x4f\xd5\x48\xce\xcf\x2b\x49\xcd\x2b\x09\xc9\x2f\xc8\x4c\x56\x70\x0c\x76\xd6\x51\x28\x28\x4d\x2a\x2e\x4d\x42\x12\x28\x4e\xcd\x4b\x49\x2d\x0a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x80\x08\x66\xa6\x80\x68\x4d\x6b\x2e\x82\xd6\x19\xe1\xb4\xce\xc5\x15\xdd\x3e\x88\x08\xba\x85\x10\xd1\xcc\x14\x30\x43\xd3\x9a\x0b\x10\x00\x00\xff\xff\x2a\x3b\xab\xf4\xe2\x00\x00\x00") + +func _2_messages_indexUpSqlBytes() ([]byte, error) { + return bindataRead( + __2_messages_indexUpSql, + "2_messages_index.up.sql", + ) +} + +func _2_messages_indexUpSql() (*asset, error) { + bytes, err := _2_messages_indexUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xce, 0xb1, 0xc8, 0x2d, 0xa8, 0x6f, 0x83, 0xfb, 0xf2, 0x40, 0x30, 0xe9, 0xd, 0x18, 0x54, 0xe8, 0xf5, 0xf5, 0xc4, 0x5b, 0xf5, 0xa4, 0x94, 0x50, 0x56, 0x4a, 0xc8, 0x73, 0x3f, 0xf1, 0x56, 0xce}} + return a, nil +} + +var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") + +func docGoBytes() ([]byte, error) { + return bindataRead( + _docGo, + "doc.go", + ) +} + +func docGo() (*asset, error) { + bytes, err := docGoBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1672850685, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}} + return a, nil +} + +// Asset loads and returns the asset for the given name. +// It returns an error if the asset could not be found or +// could not be loaded. +func Asset(name string) ([]byte, error) { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { + a, err := f() + if err != nil { + return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) + } + return a.bytes, nil + } + return nil, fmt.Errorf("Asset %s not found", name) +} + +// AssetString returns the asset contents as a string (instead of a []byte). +func AssetString(name string) (string, error) { + data, err := Asset(name) + return string(data), err +} + +// MustAsset is like Asset but panics when Asset would return an error. +// It simplifies safe initialization of global variables. +func MustAsset(name string) []byte { + a, err := Asset(name) + if err != nil { + panic("asset: Asset(" + name + "): " + err.Error()) + } + + return a +} + +// MustAssetString is like AssetString but panics when Asset would return an +// error. It simplifies safe initialization of global variables. +func MustAssetString(name string) string { + return string(MustAsset(name)) +} + +// AssetInfo loads and returns the asset info for the given name. +// It returns an error if the asset could not be found or +// could not be loaded. +func AssetInfo(name string) (os.FileInfo, error) { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { + a, err := f() + if err != nil { + return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) + } + return a.info, nil + } + return nil, fmt.Errorf("AssetInfo %s not found", name) +} + +// AssetDigest returns the digest of the file with the given name. It returns an +// error if the asset could not be found or the digest could not be loaded. +func AssetDigest(name string) ([sha256.Size]byte, error) { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { + a, err := f() + if err != nil { + return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err) + } + return a.digest, nil + } + return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name) +} + +// Digests returns a map of all known files and their checksums. +func Digests() (map[string][sha256.Size]byte, error) { + mp := make(map[string][sha256.Size]byte, len(_bindata)) + for name := range _bindata { + a, err := _bindata[name]() + if err != nil { + return nil, err + } + mp[name] = a.digest + } + return mp, nil +} + +// AssetNames returns the names of the assets. +func AssetNames() []string { + names := make([]string, 0, len(_bindata)) + for name := range _bindata { + names = append(names, name) + } + return names +} + +// _bindata is a table, holding each asset generator, mapped to its name. +var _bindata = map[string]func() (*asset, error){ + "1_messages.down.sql": _1_messagesDownSql, + + "1_messages.up.sql": _1_messagesUpSql, + + "2_messages_index.down.sql": _2_messages_indexDownSql, + + "2_messages_index.up.sql": _2_messages_indexUpSql, + + "doc.go": docGo, +} + +// AssetDir returns the file names below a certain +// directory embedded in the file by go-bindata. +// For example if you run go-bindata on data/... and data contains the +// following hierarchy: +// data/ +// foo.txt +// img/ +// a.png +// b.png +// then AssetDir("data") would return []string{"foo.txt", "img"}, +// AssetDir("data/img") would return []string{"a.png", "b.png"}, +// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and +// AssetDir("") will return []string{"data"}. +func AssetDir(name string) ([]string, error) { + node := _bintree + if len(name) != 0 { + canonicalName := strings.Replace(name, "\\", "/", -1) + pathList := strings.Split(canonicalName, "/") + for _, p := range pathList { + node = node.Children[p] + if node == nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + } + } + if node.Func != nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + rv := make([]string, 0, len(node.Children)) + for childName := range node.Children { + rv = append(rv, childName) + } + return rv, nil +} + +type bintree struct { + Func func() (*asset, error) + Children map[string]*bintree +} + +var _bintree = &bintree{nil, map[string]*bintree{ + "1_messages.down.sql": &bintree{_1_messagesDownSql, map[string]*bintree{}}, + "1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}}, + "2_messages_index.down.sql": &bintree{_2_messages_indexDownSql, map[string]*bintree{}}, + "2_messages_index.up.sql": &bintree{_2_messages_indexUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, +}} + +// RestoreAsset restores an asset under the given directory. +func RestoreAsset(dir, name string) error { + data, err := Asset(name) + if err != nil { + return err + } + info, err := AssetInfo(name) + if err != nil { + return err + } + err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755)) + if err != nil { + return err + } + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) + if err != nil { + return err + } + return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) +} + +// RestoreAssets restores an asset under the given directory recursively. +func RestoreAssets(dir, name string) error { + children, err := AssetDir(name) + // File + if err != nil { + return RestoreAsset(dir, name) + } + // Dir + for _, child := range children { + err = RestoreAssets(dir, filepath.Join(name, child)) + if err != nil { + return err + } + } + return nil +} + +func _filePath(dir, name string) string { + canonicalName := strings.Replace(name, "\\", "/", -1) + return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...) +} diff --git a/waku/persistence/migrations/migrate.go b/waku/persistence/postgres/migrations/migrate.go similarity index 61% rename from waku/persistence/migrations/migrate.go rename to waku/persistence/postgres/migrations/migrate.go index a1f00154..dd5c733b 100644 --- a/waku/persistence/migrations/migrate.go +++ b/waku/persistence/postgres/migrations/migrate.go @@ -7,38 +7,30 @@ import ( "database/sql" "github.com/golang-migrate/migrate/v4" - "github.com/golang-migrate/migrate/v4/database/sqlite3" + "github.com/golang-migrate/migrate/v4/database" + bindata "github.com/golang-migrate/migrate/v4/source/go_bindata" ) // Migrate applies migrations. -func Migrate(db *sql.DB) error { +func Migrate(db *sql.DB, driver database.Driver) error { return migrateDB(db, bindata.Resource( AssetNames(), - func(name string) ([]byte, error) { - return Asset(name) - }, - )) + Asset, + ), driver) } // Migrate database using provided resources. -func migrateDB(db *sql.DB, resources *bindata.AssetSource) error { +func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error { source, err := bindata.WithInstance(resources) if err != nil { return err } - driver, err := sqlite3.WithInstance(db, &sqlite3.Config{ - MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, - }) - if err != nil { - return err - } - m, err := migrate.NewWithInstance( "go-bindata", source, - "sqlite", + "gowakudb", driver) if err != nil { return err diff --git a/waku/persistence/migrations/no_migrations.go b/waku/persistence/postgres/migrations/no_migrations.go similarity index 100% rename from waku/persistence/migrations/no_migrations.go rename to waku/persistence/postgres/migrations/no_migrations.go diff --git a/waku/persistence/migrations/sql/1_messages.down.sql b/waku/persistence/postgres/migrations/sql/1_messages.down.sql similarity index 100% rename from waku/persistence/migrations/sql/1_messages.down.sql rename to waku/persistence/postgres/migrations/sql/1_messages.down.sql diff --git a/waku/persistence/postgres/migrations/sql/1_messages.up.sql b/waku/persistence/postgres/migrations/sql/1_messages.up.sql new file mode 100644 index 00000000..ec879cb0 --- /dev/null +++ b/waku/persistence/postgres/migrations/sql/1_messages.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS message ( + id BYTEA, + receiverTimestamp BIGINT NOT NULL, + senderTimestamp BIGINT NOT NULL, + contentTopic BYTEA NOT NULL, + pubsubTopic BYTEA NOT NULL, + payload BYTEA, + version INTEGER NOT NULL DEFAULT 0, + CONSTRAINT messageIndex PRIMARY KEY (id, pubsubTopic) +); + +CREATE INDEX IF NOT EXISTS message_senderTimestamp ON message(senderTimestamp); +CREATE INDEX IF NOT EXISTS message_receiverTimestamp ON message(receiverTimestamp); \ No newline at end of file diff --git a/waku/persistence/migrations/sql/2_messages_index.down.sql b/waku/persistence/postgres/migrations/sql/2_messages_index.down.sql similarity index 100% rename from waku/persistence/migrations/sql/2_messages_index.down.sql rename to waku/persistence/postgres/migrations/sql/2_messages_index.down.sql diff --git a/waku/persistence/migrations/sql/2_messages_index.up.sql b/waku/persistence/postgres/migrations/sql/2_messages_index.up.sql similarity index 100% rename from waku/persistence/migrations/sql/2_messages_index.up.sql rename to waku/persistence/postgres/migrations/sql/2_messages_index.up.sql diff --git a/waku/persistence/migrations/sql/doc.go b/waku/persistence/postgres/migrations/sql/doc.go similarity index 100% rename from waku/persistence/migrations/sql/doc.go rename to waku/persistence/postgres/migrations/sql/doc.go diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go new file mode 100644 index 00000000..d66cfd49 --- /dev/null +++ b/waku/persistence/postgres/postgres.go @@ -0,0 +1,146 @@ +package postgres + +import ( + "database/sql" + "fmt" + + "github.com/golang-migrate/migrate/v4/database" + "github.com/golang-migrate/migrate/v4/database/postgres" + _ "github.com/lib/pq" + "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/postgres/migrations" +) + +// Queries are the postgresql queries for a given table. +type Queries struct { + deleteQuery string + existsQuery string + getQuery string + putQuery string + queryQuery string + prefixQuery string + limitQuery string + offsetQuery string + getSizeQuery string +} + +// NewQueries creates a new Postgresql set of queries for the passed table +func NewQueries(tbl string, db *sql.DB) (*Queries, error) { + err := CreateTable(db, tbl) + if err != nil { + return nil, err + } + return &Queries{ + deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), + existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), + getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), + putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), + queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), + prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, + limitQuery: ` LIMIT %d`, + offsetQuery: ` OFFSET %d`, + getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl), + }, nil +} + +// Delete returns the query for deleting a row. +func (q Queries) Delete() string { + return q.deleteQuery +} + +// Exists returns the query for determining if a row exists. +func (q Queries) Exists() string { + return q.existsQuery +} + +// Get returns the query for getting a row. +func (q Queries) Get() string { + return q.getQuery +} + +// Put returns the query for putting a row. +func (q Queries) Put() string { + return q.putQuery +} + +// Query returns the query for getting multiple rows. +func (q Queries) Query() string { + return q.queryQuery +} + +// Prefix returns the query fragment for getting a rows with a key prefix. +func (q Queries) Prefix() string { + return q.prefixQuery +} + +// Limit returns the query fragment for limiting results. +func (q Queries) Limit() string { + return q.limitQuery +} + +// Offset returns the query fragment for returning rows from a given offset. +func (q Queries) Offset() string { + return q.offsetQuery +} + +// GetSize returns the query for determining the size of a value. +func (q Queries) GetSize() string { + return q.getSizeQuery +} + +// WithDB is a DBOption that lets you use a postgresql DBStore and run migrations +func WithDB(dburl string, migrate bool) persistence.DBOption { + return func(d *persistence.DBStore) error { + driverOption := persistence.WithDriver("postgres", dburl) + err := driverOption(d) + if err != nil { + return err + } + + if !migrate { + return nil + } + + migrationOpt := persistence.WithMigrations(Migrate) + err = migrationOpt(d) + if err != nil { + return err + } + + return nil + } +} + +// NewDB connects to postgres DB in the specified path +func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { + db, err := sql.Open("postgres", dburl) + if err != nil { + return nil, nil, err + } + + return db, Migrate, nil +} + +func migrationDriver(db *sql.DB) (database.Driver, error) { + return postgres.WithInstance(db, &postgres.Config{ + MigrationsTable: "gowaku_" + postgres.DefaultMigrationsTable, + }) +} + +// CreateTable creates the table that will persist the peers +func CreateTable(db *sql.DB, tableName string) error { + sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName) + _, err := db.Exec(sqlStmt) + if err != nil { + return err + } + return nil +} + +func Migrate(db *sql.DB) error { + migrationDriver, err := migrationDriver(db) + if err != nil { + return err + } + return migrations.Migrate(db, migrationDriver) +} diff --git a/waku/persistence/migrations/bindata.go b/waku/persistence/sqlite/migrations/bindata.go similarity index 97% rename from waku/persistence/migrations/bindata.go rename to waku/persistence/sqlite/migrations/bindata.go index 7d139fba..31d1b5d7 100644 --- a/waku/persistence/migrations/bindata.go +++ b/waku/persistence/sqlite/migrations/bindata.go @@ -88,7 +88,7 @@ func _1_messagesDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)} + info := bindataFileInfo{name: "1_messages.down.sql", size: 124, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xff, 0x4a, 0x8e, 0xa9, 0xd9, 0xa8, 0xa4, 0x73, 0x3a, 0x54, 0xe4, 0x35, 0xfd, 0xea, 0x87, 0x4c, 0xa, 0x5c, 0xc0, 0xc9, 0xe7, 0x8, 0x8c, 0x6f, 0x60, 0x9e, 0x54, 0x77, 0x59, 0xd0, 0x2b, 0xfe}} return a, nil } @@ -108,7 +108,7 @@ func _1_messagesUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_messages.up.sql", size: 464, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)} + info := bindataFileInfo{name: "1_messages.up.sql", size: 464, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4, 0xd8, 0x47, 0x7b, 0xe, 0x47, 0x2a, 0x4b, 0x48, 0x36, 0x23, 0x93, 0x28, 0xb3, 0x1e, 0x5, 0x76, 0x64, 0x73, 0xb, 0x2b, 0x5b, 0x10, 0x62, 0x36, 0x21, 0x6f, 0xa3, 0x3c, 0xdd, 0xe2, 0xcf}} return a, nil } @@ -128,7 +128,7 @@ func _2_messages_indexDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1664827710, 0)} + info := bindataFileInfo{name: "2_messages_index.down.sql", size: 60, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6e, 0xcb, 0x70, 0x82, 0x33, 0x13, 0x70, 0xd5, 0xbd, 0x3e, 0x68, 0x9, 0x4f, 0x78, 0xa9, 0xc, 0xd6, 0xf4, 0x64, 0xa0, 0x8c, 0xe4, 0x0, 0x15, 0x71, 0xf0, 0x5, 0xdb, 0xa6, 0xf2, 0x12, 0x60}} return a, nil } @@ -148,7 +148,7 @@ func _2_messages_indexUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1664827710, 0)} + info := bindataFileInfo{name: "2_messages_index.up.sql", size: 226, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xce, 0xb1, 0xc8, 0x2d, 0xa8, 0x6f, 0x83, 0xfb, 0xf2, 0x40, 0x30, 0xe9, 0xd, 0x18, 0x54, 0xe8, 0xf5, 0xf5, 0xc4, 0x5b, 0xf5, 0xa4, 0x94, 0x50, 0x56, 0x4a, 0xc8, 0x73, 0x3f, 0xf1, 0x56, 0xce}} return a, nil } @@ -168,7 +168,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1663712987, 0)} + info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1667483667, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}} return a, nil } diff --git a/waku/persistence/sqlite/migrations/migrate.go b/waku/persistence/sqlite/migrations/migrate.go new file mode 100644 index 00000000..dd5c733b --- /dev/null +++ b/waku/persistence/sqlite/migrations/migrate.go @@ -0,0 +1,43 @@ +//go:build !gowaku_skip_migrations +// +build !gowaku_skip_migrations + +package migrations + +import ( + "database/sql" + + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database" + + bindata "github.com/golang-migrate/migrate/v4/source/go_bindata" +) + +// Migrate applies migrations. +func Migrate(db *sql.DB, driver database.Driver) error { + return migrateDB(db, bindata.Resource( + AssetNames(), + Asset, + ), driver) +} + +// Migrate database using provided resources. +func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error { + source, err := bindata.WithInstance(resources) + if err != nil { + return err + } + + m, err := migrate.NewWithInstance( + "go-bindata", + source, + "gowakudb", + driver) + if err != nil { + return err + } + + if err = m.Up(); err != migrate.ErrNoChange { + return err + } + return nil +} diff --git a/waku/persistence/sqlite/migrations/no_migrations.go b/waku/persistence/sqlite/migrations/no_migrations.go new file mode 100644 index 00000000..ae2297f3 --- /dev/null +++ b/waku/persistence/sqlite/migrations/no_migrations.go @@ -0,0 +1,13 @@ +//go:build gowaku_skip_migrations +// +build gowaku_skip_migrations + +package migrations + +import ( + "database/sql" +) + +// Skip migration code +func Migrate(db *sql.DB) error { + return nil +} diff --git a/waku/persistence/sqlite/migrations/sql/1_messages.down.sql b/waku/persistence/sqlite/migrations/sql/1_messages.down.sql new file mode 100644 index 00000000..26090600 --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/1_messages.down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS message_senderTimestamp; +DROP INDEX IF EXISTS message_receiverTimestamp; +DROP TABLE IF EXISTS message; diff --git a/waku/persistence/migrations/sql/1_messages.up.sql b/waku/persistence/sqlite/migrations/sql/1_messages.up.sql similarity index 100% rename from waku/persistence/migrations/sql/1_messages.up.sql rename to waku/persistence/sqlite/migrations/sql/1_messages.up.sql diff --git a/waku/persistence/sqlite/migrations/sql/2_messages_index.down.sql b/waku/persistence/sqlite/migrations/sql/2_messages_index.down.sql new file mode 100644 index 00000000..f12aa3db --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/2_messages_index.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS i_msg_1; +DROP INDEX IF EXISTS i_msg_2; diff --git a/waku/persistence/sqlite/migrations/sql/2_messages_index.up.sql b/waku/persistence/sqlite/migrations/sql/2_messages_index.up.sql new file mode 100644 index 00000000..74014596 --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/2_messages_index.up.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC); +CREATE INDEX IF NOT EXISTS i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC); diff --git a/waku/persistence/sqlite/migrations/sql/doc.go b/waku/persistence/sqlite/migrations/sql/doc.go new file mode 100644 index 00000000..e0a06039 --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/doc.go @@ -0,0 +1,3 @@ +package sql + +//go:generate go-bindata -pkg migrations -o ../bindata.go ./ diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index 851b091b..a0ef8243 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -5,8 +5,11 @@ import ( "fmt" "strings" + "github.com/golang-migrate/migrate/v4/database" + "github.com/golang-migrate/migrate/v4/database/sqlite3" _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" ) // Queries are the sqlite queries for a given table. @@ -102,25 +105,43 @@ func addSqliteURLDefaults(dburl string) string { return dburl } -// WithDB is a DBOption that lets you use a sqlite3 DBStore. -func WithDB(dburl string) persistence.DBOption { - return persistence.WithDriver("sqlite3", addSqliteURLDefaults(dburl), persistence.ConnectionPoolOptions{ - // Disable concurrent access as not supported by the driver - MaxOpenConnections: 1, - }) +// WithDB is a DBOption that lets you use a sqlite3 DBStore and run migrations +func WithDB(dburl string, migrate bool) persistence.DBOption { + return func(d *persistence.DBStore) error { + driverOption := persistence.WithDriver("sqlite3", addSqliteURLDefaults(dburl), persistence.ConnectionPoolOptions{ + // Disable concurrent access as not supported by the driver + MaxOpenConnections: 1, + }) + err := driverOption(d) + if err != nil { + return err + } + + if !migrate { + return nil + } + + migrationOpt := persistence.WithMigrations(Migrate) + err = migrationOpt(d) + if err != nil { + return err + } + + return nil + } } // NewDB creates a sqlite3 DB in the specified path -func NewDB(dburl string) (*sql.DB, error) { +func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl)) if err != nil { - return nil, err + return nil, nil, err } // Disable concurrent access as not supported by the driver db.SetMaxOpenConns(1) - return db, nil + return db, Migrate, nil } // CreateTable creates the table that will persist the peers @@ -132,3 +153,17 @@ func CreateTable(db *sql.DB, tableName string) error { } return nil } + +func migrationDriver(db *sql.DB) (database.Driver, error) { + return sqlite3.WithInstance(db, &sqlite3.Config{ + MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, + }) +} + +func Migrate(db *sql.DB) error { + migrationDriver, err := migrationDriver(db) + if err != nil { + return err + } + return migrations.Migrate(db, migrationDriver) +} diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 7959a0b1..00183752 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/waku-org/go-waku/waku/persistence/migrations" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -33,7 +32,10 @@ const WALMode = "wal" // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { MessageProvider - db *sql.DB + + db *sql.DB + migrationFn func(db *sql.DB) error + timesource timesource.Timesource log *zap.Logger @@ -101,19 +103,18 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { } } -// WithMigrationsEnabled is a DBOption used to determine whether migrations should -// be executed or not -func WithMigrationsEnabled(enabled bool) DBOption { +// WithMigrations is a DBOption used to determine if migrations should +// be executed, and what driver to use +func WithMigrations(migrationFn func(db *sql.DB) error) DBOption { return func(d *DBStore) error { - d.enableMigrations = enabled + d.enableMigrations = true + d.migrationFn = migrationFn return nil } } func DefaultOptions() []DBOption { - return []DBOption{ - WithMigrationsEnabled(true), - } + return []DBOption{} } // Creates a new DB store using the db specified via options. @@ -135,7 +136,7 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { } if result.enableMigrations { - err := migrations.Migrate(result.db) + err := result.migrationFn(result.db) if err != nil { return nil, err } @@ -164,7 +165,7 @@ func (d *DBStore) cleanOlderRecords() error { // Delete older messages if d.maxDuration > 0 { start := time.Now() - sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?` + sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1` _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration))) if err != nil { return err @@ -176,7 +177,7 @@ func (d *DBStore) cleanOlderRecords() error { // Limit number of records to a max N if d.maxMessages > 0 { start := time.Now() - sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?)` + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET $1)` _, err := d.db.Exec(sqlStmt, d.maxMessages) if err != nil { return err @@ -218,7 +219,7 @@ func (d *DBStore) Stop() { // Put inserts a WakuMessage into the DB func (d *DBStore) Put(env *protocol.Envelope) error { - stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)") + stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)") if err != nil { return err } @@ -249,14 +250,15 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message %s - ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s - LIMIT ?` + ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s ` var conditions []string var parameters []interface{} + paramCnt := 0 if query.PubsubTopic != "" { - conditions = append(conditions, "pubsubTopic = ?") + paramCnt++ + conditions = append(conditions, fmt.Sprintf("pubsubTopic = $%d", paramCnt)) parameters = append(parameters, query.PubsubTopic) } @@ -264,7 +266,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err var ctPlaceHolder []string for _, ct := range query.ContentFilters { if ct.ContentTopic != "" { - ctPlaceHolder = append(ctPlaceHolder, "?") + paramCnt++ + ctPlaceHolder = append(ctPlaceHolder, fmt.Sprintf("$%d", paramCnt)) parameters = append(parameters, ct.ContentTopic) } } @@ -277,7 +280,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err var exists bool cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) - err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)", + err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)", cursorDBKey.Bytes(), ).Scan(&exists) @@ -290,7 +293,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { eqOp = "<" } - conditions = append(conditions, fmt.Sprintf("id %s ?", eqOp)) + paramCnt++ + conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, paramCnt)) parameters = append(parameters, cursorDBKey.Bytes()) } else { @@ -300,7 +304,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.StartTime != 0 { if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { - conditions = append(conditions, "id >= ?") + paramCnt++ + conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt)) startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{}) parameters = append(parameters, startTimeDBKey.Bytes()) } @@ -309,7 +314,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err if query.EndTime != 0 { if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD { - conditions = append(conditions, "id <= ?") + paramCnt++ + conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt)) endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{}) parameters = append(parameters, endTimeDBKey.Bytes()) } @@ -325,6 +331,8 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err orderDirection = "DESC" } + paramCnt++ + sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt) sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection) stmt, err := d.db.Prepare(sqlQuery) diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 05c2b6c0..def9b154 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -5,15 +5,27 @@ import ( "testing" "time" + "github.com/golang-migrate/migrate/v4/database/sqlite3" _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) +func Migrate(db *sql.DB) error { + migrationDriver, err := sqlite3.WithInstance(db, &sqlite3.Config{ + MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, + }) + if err != nil { + return err + } + return migrations.Migrate(db, migrationDriver) +} + func NewMock() *sql.DB { db, err := sql.Open("sqlite3", ":memory:") if err != nil { @@ -25,8 +37,7 @@ func NewMock() *sql.DB { func TestDbStore(t *testing.T) { db := NewMock() - option := WithDB(db) - store, err := NewDBStore(utils.Logger(), option) + store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate)) require.NoError(t, err) err = store.Start(timesource.NewDefaultClock()) @@ -46,7 +57,7 @@ func TestDbStore(t *testing.T) { func TestStoreRetention(t *testing.T) { db := NewMock() - store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) + store, err := NewDBStore(utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) err = store.Start(timesource.NewDefaultClock()) diff --git a/waku/v2/node/connectedness_test.go b/waku/v2/node/connectedness_test.go index c53e6522..0fbd258e 100644 --- a/waku/v2/node/connectedness_test.go +++ b/waku/v2/node/connectedness_test.go @@ -69,9 +69,9 @@ func TestConnectionStatusChanges(t *testing.T) { err = node2.Start() require.NoError(t, err) - db, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) // Node3: Relay + Store diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index eba3a602..db4e6ce6 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -182,9 +182,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) { defer wakuNode1.Stop() // NODE2: Filter Client/Store - db, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") diff --git a/waku/v2/protocol/pb/waku_peer_exchange.pb.go b/waku/v2/protocol/pb/waku_peer_exchange.pb.go index 9cda32bb..0dae3226 100644 --- a/waku/v2/protocol/pb/waku_peer_exchange.pb.go +++ b/waku/v2/protocol/pb/waku_peer_exchange.pb.go @@ -219,30 +219,30 @@ func (m *PeerExchangeRPC) GetResponse() *PeerExchangeResponse { } func init() { - proto.RegisterType((*PeerInfo)(nil), "PeerInfo") - proto.RegisterType((*PeerExchangeQuery)(nil), "PeerExchangeQuery") - proto.RegisterType((*PeerExchangeResponse)(nil), "PeerExchangeResponse") - proto.RegisterType((*PeerExchangeRPC)(nil), "PeerExchangeRPC") + proto.RegisterType((*PeerInfo)(nil), "pb.PeerInfo") + proto.RegisterType((*PeerExchangeQuery)(nil), "pb.PeerExchangeQuery") + proto.RegisterType((*PeerExchangeResponse)(nil), "pb.PeerExchangeResponse") + proto.RegisterType((*PeerExchangeRPC)(nil), "pb.PeerExchangeRPC") } func init() { proto.RegisterFile("waku_peer_exchange.proto", fileDescriptor_ce50192ba54b780f) } var fileDescriptor_ce50192ba54b780f = []byte{ - // 211 bytes of a gzipped FileDescriptorProto + // 220 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4f, 0xcc, 0x2e, 0x8d, 0x2f, 0x48, 0x4d, 0x2d, 0x8a, 0x4f, 0xad, 0x48, 0xce, 0x48, 0xcc, 0x4b, 0x4f, 0xd5, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, 0xf2, 0xcc, 0x4b, 0xcb, - 0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31, - 0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x3d, 0x81, 0xa5, 0xa9, 0x45, 0x95, 0x42, 0x52, - 0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, 0x38, 0x5f, 0xc9, 0x9e, - 0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x3a, 0x17, 0x67, - 0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x4e, 0x3d, 0x98, 0xc5, 0x41, 0x08, 0x39, - 0xa5, 0x3c, 0x2e, 0x7e, 0x14, 0x03, 0x02, 0x9c, 0x85, 0x34, 0xb8, 0x58, 0x0b, 0x41, 0x16, 0x83, - 0x2d, 0xe3, 0x36, 0x12, 0xd2, 0xc3, 0x70, 0x52, 0x10, 0x44, 0x81, 0x90, 0x21, 0x17, 0x47, 0x11, - 0xd4, 0x46, 0x09, 0x26, 0xb0, 0x62, 0x51, 0x3d, 0x6c, 0xce, 0x09, 0x82, 0x2b, 0x73, 0x12, 0x38, - 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63, - 0x48, 0x62, 0x03, 0x07, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x99, 0xd5, 0xbb, 0xb6, 0x34, - 0x01, 0x00, 0x00, + 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, + 0xf2, 0xcc, 0x4b, 0xcb, 0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, + 0xe0, 0x09, 0x02, 0x31, 0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x7d, 0x81, 0xa5, 0xa9, + 0x45, 0x95, 0x42, 0x52, 0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, + 0x38, 0x5f, 0xc9, 0x89, 0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, + 0x90, 0x16, 0x17, 0x67, 0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x1e, 0xbd, 0x82, + 0x24, 0x3d, 0x98, 0xdd, 0x41, 0x08, 0x69, 0xa5, 0x12, 0x2e, 0x7e, 0x14, 0x33, 0x02, 0x9c, 0x85, + 0xb4, 0xb9, 0x58, 0x0b, 0x41, 0x76, 0x83, 0xed, 0xe3, 0x36, 0x12, 0x85, 0x69, 0x45, 0x71, 0x58, + 0x10, 0x44, 0x8d, 0x90, 0x09, 0x17, 0x47, 0x11, 0xd4, 0x5e, 0x09, 0x26, 0xb0, 0x7a, 0x09, 0x74, + 0xf5, 0x30, 0x77, 0x05, 0xc1, 0x55, 0x3a, 0x09, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, + 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x43, 0xc9, 0x18, 0x10, + 0x00, 0x00, 0xff, 0xff, 0x25, 0x86, 0x8e, 0x1d, 0x41, 0x01, 0x00, 0x00, } func (m *PeerInfo) Marshal() (dAtA []byte, err error) { diff --git a/waku/v2/protocol/pb/waku_peer_exchange.proto b/waku/v2/protocol/pb/waku_peer_exchange.proto index 0a77911c..f0b77234 100644 --- a/waku/v2/protocol/pb/waku_peer_exchange.proto +++ b/waku/v2/protocol/pb/waku_peer_exchange.proto @@ -1,5 +1,7 @@ syntax = "proto3"; +package pb; + message PeerInfo { bytes ENR = 1; } diff --git a/waku/v2/protocol/store/utils_test.go b/waku/v2/protocol/store/utils_test.go index e3419cc7..24121b3c 100644 --- a/waku/v2/protocol/store/utils_test.go +++ b/waku/v2/protocol/store/utils_test.go @@ -12,10 +12,10 @@ import ( func MemoryDB(t *testing.T) *persistence.DBStore { var db *sql.DB - db, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) return dbStore