mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-26 12:07:34 +00:00
Persistence: Addressing code-climate issues to increase maintainability score (#592)
* chore:fix minor issues reported by code-climate * chore: reduce code duplication * chore: update vendorSHA for nix build due to recent updates to dependencies
This commit is contained in:
parent
6ece3c483b
commit
97f02361d4
@ -17,12 +17,12 @@ import (
|
||||
|
||||
// SignalHandler defines a minimal interface
|
||||
// a signal handler needs to implement.
|
||||
//nolint
|
||||
// nolint
|
||||
type SignalHandler interface {
|
||||
HandleSignal(string)
|
||||
}
|
||||
|
||||
// SignalHandler is a simple callback function that gets called when any signal is received
|
||||
// MobileSignalHandler is a simple callback function that gets called when any signal is received
|
||||
type MobileSignalHandler func([]byte)
|
||||
|
||||
// storing the current mobile signal handler here
|
||||
@ -64,7 +64,7 @@ func send(signalType string, event interface{}) {
|
||||
|
||||
// SetMobileSignalHandler setup geth callback to notify about new signal
|
||||
// used for gomobile builds
|
||||
//nolint
|
||||
// nolint
|
||||
func SetMobileSignalHandler(handler SignalHandler) {
|
||||
mobileSignalHandler = func(data []byte) {
|
||||
if len(data) > 0 {
|
||||
@ -73,6 +73,8 @@ func SetMobileSignalHandler(handler SignalHandler) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetEventCallback is to set a callback in order to receive application
|
||||
// signals which are used to react to asynchronous events in waku.
|
||||
func SetEventCallback(cb unsafe.Pointer) {
|
||||
C.SetEventCallback(cb)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
||||
"github.com/pbnjay/memory"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
|
||||
"github.com/waku-org/go-waku/waku/v2/peers"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
@ -43,7 +44,6 @@ import (
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/metrics"
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
|
@ -1,4 +1,4 @@
|
||||
package migrations
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
@ -10,10 +10,10 @@ import (
|
||||
)
|
||||
|
||||
// Migrate applies migrations.
|
||||
func Migrate(db *sql.DB, driver database.Driver) error {
|
||||
func Migrate(db *sql.DB, driver database.Driver, assetNames []string, assetFunc bindata.AssetFunc) error {
|
||||
return migrateDB(db, bindata.Resource(
|
||||
AssetNames(),
|
||||
Asset,
|
||||
assetNames,
|
||||
assetFunc,
|
||||
), driver)
|
||||
}
|
||||
|
@ -6,88 +6,12 @@ import (
|
||||
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/golang-migrate/migrate/v4/database/pgx"
|
||||
_ "github.com/jackc/pgx/v5/stdlib"
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // Blank import to register the postgres driver
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/migrate"
|
||||
"github.com/waku-org/go-waku/waku/persistence/postgres/migrations"
|
||||
)
|
||||
|
||||
// Queries are the postgresql queries for a given table.
|
||||
type Queries struct {
|
||||
deleteQuery string
|
||||
existsQuery string
|
||||
getQuery string
|
||||
putQuery string
|
||||
queryQuery string
|
||||
prefixQuery string
|
||||
limitQuery string
|
||||
offsetQuery string
|
||||
getSizeQuery string
|
||||
}
|
||||
|
||||
// NewQueries creates a new Postgresql set of queries for the passed table
|
||||
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
|
||||
err := CreateTable(db, tbl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Queries{
|
||||
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
|
||||
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
|
||||
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
|
||||
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
|
||||
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
|
||||
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
|
||||
limitQuery: ` LIMIT %d`,
|
||||
offsetQuery: ` OFFSET %d`,
|
||||
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete returns the query for deleting a row.
|
||||
func (q Queries) Delete() string {
|
||||
return q.deleteQuery
|
||||
}
|
||||
|
||||
// Exists returns the query for determining if a row exists.
|
||||
func (q Queries) Exists() string {
|
||||
return q.existsQuery
|
||||
}
|
||||
|
||||
// Get returns the query for getting a row.
|
||||
func (q Queries) Get() string {
|
||||
return q.getQuery
|
||||
}
|
||||
|
||||
// Put returns the query for putting a row.
|
||||
func (q Queries) Put() string {
|
||||
return q.putQuery
|
||||
}
|
||||
|
||||
// Query returns the query for getting multiple rows.
|
||||
func (q Queries) Query() string {
|
||||
return q.queryQuery
|
||||
}
|
||||
|
||||
// Prefix returns the query fragment for getting a rows with a key prefix.
|
||||
func (q Queries) Prefix() string {
|
||||
return q.prefixQuery
|
||||
}
|
||||
|
||||
// Limit returns the query fragment for limiting results.
|
||||
func (q Queries) Limit() string {
|
||||
return q.limitQuery
|
||||
}
|
||||
|
||||
// Offset returns the query fragment for returning rows from a given offset.
|
||||
func (q Queries) Offset() string {
|
||||
return q.offsetQuery
|
||||
}
|
||||
|
||||
// GetSize returns the query for determining the size of a value.
|
||||
func (q Queries) GetSize() string {
|
||||
return q.getSizeQuery
|
||||
}
|
||||
|
||||
// WithDB is a DBOption that lets you use a postgresql DBStore and run migrations
|
||||
func WithDB(dburl string, migrate bool) persistence.DBOption {
|
||||
return func(d *persistence.DBStore) error {
|
||||
@ -127,6 +51,15 @@ func migrationDriver(db *sql.DB) (database.Driver, error) {
|
||||
})
|
||||
}
|
||||
|
||||
// Migrate is the function used for DB migration with postgres driver
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
|
||||
}
|
||||
|
||||
// CreateTable creates the table that will persist the peers
|
||||
func CreateTable(db *sql.DB, tableName string) error {
|
||||
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
|
||||
@ -137,10 +70,11 @@ func CreateTable(db *sql.DB, tableName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
// NewQueries creates a new SQL set of queries for the passed table
|
||||
func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) {
|
||||
err := CreateTable(db, tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
return persistence.CreateQueries(tbl, db), nil
|
||||
}
|
||||
|
80
waku/persistence/sql_queries.go
Normal file
80
waku/persistence/sql_queries.go
Normal file
@ -0,0 +1,80 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Queries are the SQL queries for a given table.
|
||||
type Queries struct {
|
||||
deleteQuery string
|
||||
existsQuery string
|
||||
getQuery string
|
||||
putQuery string
|
||||
queryQuery string
|
||||
prefixQuery string
|
||||
limitQuery string
|
||||
offsetQuery string
|
||||
getSizeQuery string
|
||||
}
|
||||
|
||||
// CreateQueries Function creates a set of queries for an SQL table.
|
||||
// Note: Do not use this function to create queries for a table, rather use <rdb>.NewQueries to create table as well as queries.
|
||||
func CreateQueries(tbl string, db *sql.DB) *Queries {
|
||||
return &Queries{
|
||||
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
|
||||
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
|
||||
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
|
||||
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
|
||||
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
|
||||
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
|
||||
limitQuery: ` LIMIT %d`,
|
||||
offsetQuery: ` OFFSET %d`,
|
||||
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
|
||||
}
|
||||
}
|
||||
|
||||
// Delete returns the query for deleting a row.
|
||||
func (q Queries) Delete() string {
|
||||
return q.deleteQuery
|
||||
}
|
||||
|
||||
// Exists returns the query for determining if a row exists.
|
||||
func (q Queries) Exists() string {
|
||||
return q.existsQuery
|
||||
}
|
||||
|
||||
// Get returns the query for getting a row.
|
||||
func (q Queries) Get() string {
|
||||
return q.getQuery
|
||||
}
|
||||
|
||||
// Put returns the query for putting a row.
|
||||
func (q Queries) Put() string {
|
||||
return q.putQuery
|
||||
}
|
||||
|
||||
// Query returns the query for getting multiple rows.
|
||||
func (q Queries) Query() string {
|
||||
return q.queryQuery
|
||||
}
|
||||
|
||||
// Prefix returns the query fragment for getting a rows with a key prefix.
|
||||
func (q Queries) Prefix() string {
|
||||
return q.prefixQuery
|
||||
}
|
||||
|
||||
// Limit returns the query fragment for limiting results.
|
||||
func (q Queries) Limit() string {
|
||||
return q.limitQuery
|
||||
}
|
||||
|
||||
// Offset returns the query fragment for returning rows from a given offset.
|
||||
func (q Queries) Offset() string {
|
||||
return q.offsetQuery
|
||||
}
|
||||
|
||||
// GetSize returns the query for determining the size of a value.
|
||||
func (q Queries) GetSize() string {
|
||||
return q.getSizeQuery
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
|
||||
bindata "github.com/golang-migrate/migrate/v4/source/go_bindata"
|
||||
)
|
||||
|
||||
// Migrate applies migrations.
|
||||
func Migrate(db *sql.DB, driver database.Driver) error {
|
||||
return migrateDB(db, bindata.Resource(
|
||||
AssetNames(),
|
||||
Asset,
|
||||
), driver)
|
||||
}
|
||||
|
||||
// Migrate database using provided resources.
|
||||
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error {
|
||||
source, err := bindata.WithInstance(resources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := migrate.NewWithInstance(
|
||||
"go-bindata",
|
||||
source,
|
||||
"gowakudb",
|
||||
driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = m.Up(); err != migrate.ErrNoChange {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -9,86 +9,10 @@ import (
|
||||
"github.com/golang-migrate/migrate/v4/database/sqlite3"
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/migrate"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
)
|
||||
|
||||
// Queries are the sqlite queries for a given table.
|
||||
type Queries struct {
|
||||
deleteQuery string
|
||||
existsQuery string
|
||||
getQuery string
|
||||
putQuery string
|
||||
queryQuery string
|
||||
prefixQuery string
|
||||
limitQuery string
|
||||
offsetQuery string
|
||||
getSizeQuery string
|
||||
}
|
||||
|
||||
// NewQueries creates a new SQLite set of queries for the passed table
|
||||
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
|
||||
err := CreateTable(db, tbl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Queries{
|
||||
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
|
||||
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
|
||||
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
|
||||
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
|
||||
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
|
||||
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
|
||||
limitQuery: ` LIMIT %d`,
|
||||
offsetQuery: ` OFFSET %d`,
|
||||
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete returns the query for deleting a row.
|
||||
func (q Queries) Delete() string {
|
||||
return q.deleteQuery
|
||||
}
|
||||
|
||||
// Exists returns the query for determining if a row exists.
|
||||
func (q Queries) Exists() string {
|
||||
return q.existsQuery
|
||||
}
|
||||
|
||||
// Get returns the query for getting a row.
|
||||
func (q Queries) Get() string {
|
||||
return q.getQuery
|
||||
}
|
||||
|
||||
// Put returns the query for putting a row.
|
||||
func (q Queries) Put() string {
|
||||
return q.putQuery
|
||||
}
|
||||
|
||||
// Query returns the query for getting multiple rows.
|
||||
func (q Queries) Query() string {
|
||||
return q.queryQuery
|
||||
}
|
||||
|
||||
// Prefix returns the query fragment for getting a rows with a key prefix.
|
||||
func (q Queries) Prefix() string {
|
||||
return q.prefixQuery
|
||||
}
|
||||
|
||||
// Limit returns the query fragment for limiting results.
|
||||
func (q Queries) Limit() string {
|
||||
return q.limitQuery
|
||||
}
|
||||
|
||||
// Offset returns the query fragment for returning rows from a given offset.
|
||||
func (q Queries) Offset() string {
|
||||
return q.offsetQuery
|
||||
}
|
||||
|
||||
// GetSize returns the query for determining the size of a value.
|
||||
func (q Queries) GetSize() string {
|
||||
return q.getSizeQuery
|
||||
}
|
||||
|
||||
func addSqliteURLDefaults(dburl string) string {
|
||||
if !strings.Contains(dburl, "?") {
|
||||
dburl += "?"
|
||||
@ -144,9 +68,24 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
|
||||
return db, Migrate, nil
|
||||
}
|
||||
|
||||
func migrationDriver(db *sql.DB) (database.Driver, error) {
|
||||
return sqlite3.WithInstance(db, &sqlite3.Config{
|
||||
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
|
||||
})
|
||||
}
|
||||
|
||||
// Migrate is the function used for DB migration with sqlite driver
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
|
||||
}
|
||||
|
||||
// CreateTable creates the table that will persist the peers
|
||||
func CreateTable(db *sql.DB, tableName string) error {
|
||||
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
|
||||
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BLOB);", tableName)
|
||||
_, err := db.Exec(sqlStmt)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -154,16 +93,11 @@ func CreateTable(db *sql.DB, tableName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrationDriver(db *sql.DB) (database.Driver, error) {
|
||||
return sqlite3.WithInstance(db, &sqlite3.Config{
|
||||
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
|
||||
})
|
||||
}
|
||||
|
||||
func Migrate(db *sql.DB) error {
|
||||
migrationDriver, err := migrationDriver(db)
|
||||
// NewQueries creates a table if it doesn't exist and a new SQL set of queries for the passed table
|
||||
func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) {
|
||||
err := CreateTable(db, tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
return persistence.CreateQueries(tbl, db), nil
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// MessageProvider is an interface that provides access to store/retrieve messages from a persistence store.
|
||||
type MessageProvider interface {
|
||||
GetAll() ([]StoredMessage, error)
|
||||
Validate(env *protocol.Envelope) error
|
||||
@ -29,8 +30,13 @@ type MessageProvider interface {
|
||||
Stop()
|
||||
}
|
||||
|
||||
// ErrInvalidCursor indicates that an invalid cursor has been passed to access store
|
||||
var ErrInvalidCursor = errors.New("invalid cursor")
|
||||
|
||||
// ErrFutureMessage indicates that a message with timestamp in future was requested to be stored
|
||||
var ErrFutureMessage = errors.New("message timestamp in the future")
|
||||
|
||||
// ErrMessageTooOld indicates that a message that was too old was requested to be stored.
|
||||
var ErrMessageTooOld = errors.New("message too old")
|
||||
|
||||
// WALMode for sqlite.
|
||||
@ -58,6 +64,7 @@ type DBStore struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// StoredMessage is the format of the message stored in persistence store
|
||||
type StoredMessage struct {
|
||||
ID []byte
|
||||
PubsubTopic string
|
||||
@ -76,6 +83,7 @@ func WithDB(db *sql.DB) DBOption {
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionPoolOptions is the options to be used for DB connection pooling
|
||||
type ConnectionPoolOptions struct {
|
||||
MaxOpenConnections int
|
||||
MaxIdleConnections int
|
||||
@ -123,6 +131,7 @@ func WithMigrations(migrationFn func(db *sql.DB) error) DBOption {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultOptions returns the default DBoptions to be used.
|
||||
func DefaultOptions() []DBOption {
|
||||
return []DBOption{}
|
||||
}
|
||||
@ -154,6 +163,7 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Start starts the store server functionality
|
||||
func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
@ -172,17 +182,17 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *DBStore) updateMetrics(ctx context.Context) {
|
||||
func (d *DBStore) updateMetrics(ctx context.Context) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
defer store.wg.Done()
|
||||
defer d.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
msgCount, err := store.Count()
|
||||
msgCount, err := d.Count()
|
||||
if err != nil {
|
||||
store.log.Error("updating store metrics", zap.Error(err))
|
||||
d.log.Error("updating store metrics", zap.Error(err))
|
||||
} else {
|
||||
metrics.RecordArchiveMessage(ctx, "stored", msgCount)
|
||||
}
|
||||
@ -256,6 +266,7 @@ func (d *DBStore) Stop() {
|
||||
d.db.Close()
|
||||
}
|
||||
|
||||
// Validate validates the message to be stored against possible fradulent conditions.
|
||||
func (d *DBStore) Validate(env *protocol.Envelope) error {
|
||||
n := time.Unix(0, env.Index().ReceiverTime)
|
||||
upperBound := n.Add(MaxTimeVariance)
|
||||
@ -300,21 +311,65 @@ func (d *DBStore) Put(env *protocol.Envelope) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query retrieves messages from the DB
|
||||
func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
elapsed := time.Since(start)
|
||||
d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed))
|
||||
}()
|
||||
func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, conditions []string, parameters []interface{}) ([]string, []interface{}, error) {
|
||||
usesCursor := false
|
||||
if query.PagingInfo.Cursor != nil {
|
||||
usesCursor = true
|
||||
var exists bool
|
||||
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
|
||||
|
||||
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)",
|
||||
cursorDBKey.Bytes(),
|
||||
).Scan(&exists)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if exists {
|
||||
eqOp := ">"
|
||||
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
eqOp = "<"
|
||||
}
|
||||
*paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt))
|
||||
|
||||
parameters = append(parameters, cursorDBKey.Bytes())
|
||||
} else {
|
||||
return nil, nil, ErrInvalidCursor
|
||||
}
|
||||
}
|
||||
|
||||
handleTimeParam := func(time int64, op string) {
|
||||
*paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt))
|
||||
timeDBKey := NewDBKey(uint64(time), uint64(time), "", []byte{})
|
||||
parameters = append(parameters, timeDBKey.Bytes())
|
||||
}
|
||||
|
||||
if query.StartTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
handleTimeParam(query.StartTime, ">=")
|
||||
}
|
||||
}
|
||||
|
||||
if query.EndTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD {
|
||||
handleTimeParam(query.EndTime, "<=")
|
||||
}
|
||||
}
|
||||
return conditions, parameters, nil
|
||||
}
|
||||
|
||||
func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) {
|
||||
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
|
||||
FROM message
|
||||
%s
|
||||
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
|
||||
FROM message
|
||||
%s
|
||||
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
|
||||
|
||||
var conditions []string
|
||||
var parameters []interface{}
|
||||
//var parameters []interface{}
|
||||
parameters := make([]interface{}, 0) //Allocating as a slice so that references get passed rather than value
|
||||
paramCnt := 0
|
||||
|
||||
if query.PubsubTopic != "" {
|
||||
@ -335,53 +390,10 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||
conditions = append(conditions, "contentTopic IN ("+strings.Join(ctPlaceHolder, ", ")+")")
|
||||
}
|
||||
|
||||
usesCursor := false
|
||||
if query.PagingInfo.Cursor != nil {
|
||||
usesCursor = true
|
||||
var exists bool
|
||||
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
|
||||
|
||||
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)",
|
||||
cursorDBKey.Bytes(),
|
||||
).Scan(&exists)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if exists {
|
||||
eqOp := ">"
|
||||
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
eqOp = "<"
|
||||
}
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, paramCnt))
|
||||
|
||||
parameters = append(parameters, cursorDBKey.Bytes())
|
||||
} else {
|
||||
return nil, nil, ErrInvalidCursor
|
||||
}
|
||||
conditions, parameters, err := d.handleQueryCursor(query, ¶mCnt, conditions, parameters)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
if query.StartTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt))
|
||||
startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{})
|
||||
parameters = append(parameters, startTimeDBKey.Bytes())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if query.EndTime != 0 {
|
||||
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD {
|
||||
paramCnt++
|
||||
conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt))
|
||||
endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{})
|
||||
parameters = append(parameters, endTimeDBKey.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
conditionStr := ""
|
||||
if len(conditions) != 0 {
|
||||
conditionStr = "WHERE " + strings.Join(conditions, " AND ")
|
||||
@ -393,18 +405,36 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||
}
|
||||
|
||||
paramCnt++
|
||||
|
||||
sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt)
|
||||
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
|
||||
d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery))
|
||||
|
||||
return sqlQuery, parameters, nil
|
||||
|
||||
}
|
||||
|
||||
// Query retrieves messages from the DB
|
||||
func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
elapsed := time.Since(start)
|
||||
d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed))
|
||||
}()
|
||||
|
||||
sqlQuery, parameters, err := d.prepareQuerySQL(query)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
stmt, err := d.db.Prepare(sqlQuery)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
pageSize := query.PagingInfo.PageSize + 1
|
||||
|
||||
parameters = append(parameters, pageSize)
|
||||
|
||||
measurementStart := time.Now()
|
||||
rows, err := stmt.Query(parameters...)
|
||||
if err != nil {
|
||||
|
@ -10,7 +10,8 @@ import (
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
"github.com/waku-org/go-waku/waku/persistence/migrate"
|
||||
sqlitemigrations "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
@ -24,7 +25,7 @@ func Migrate(db *sql.DB) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return migrations.Migrate(db, migrationDriver)
|
||||
return migrate.Migrate(db, migrationDriver, sqlitemigrations.AssetNames(), sqlitemigrations.Asset)
|
||||
}
|
||||
|
||||
func NewMock() *sql.DB {
|
||||
|
Loading…
x
Reference in New Issue
Block a user