From 97f02361d4631177966963b91a940bf8dd2973be Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 7 Jul 2023 07:08:23 +0530 Subject: [PATCH] Persistence: Addressing code-climate issues to increase maintainability score (#592) * chore:fix minor issues reported by code-climate * chore: reduce code duplication * chore: update vendorSHA for nix build due to recent updates to dependencies --- mobile/signals.go | 8 +- waku/node.go | 2 +- .../migrations => migrate}/migrate.go | 8 +- waku/persistence/postgres/postgres.go | 98 ++--------- waku/persistence/sql_queries.go | 80 +++++++++ waku/persistence/sqlite/migrations/migrate.go | 40 ----- waku/persistence/sqlite/sqlite.go | 110 +++---------- waku/persistence/store.go | 154 +++++++++++------- waku/persistence/store_test.go | 5 +- 9 files changed, 223 insertions(+), 282 deletions(-) rename waku/persistence/{postgres/migrations => migrate}/migrate.go (82%) create mode 100644 waku/persistence/sql_queries.go delete mode 100644 waku/persistence/sqlite/migrations/migrate.go diff --git a/mobile/signals.go b/mobile/signals.go index 3e0a7957..142802e8 100644 --- a/mobile/signals.go +++ b/mobile/signals.go @@ -17,12 +17,12 @@ import ( // SignalHandler defines a minimal interface // a signal handler needs to implement. -//nolint +// nolint type SignalHandler interface { HandleSignal(string) } -// SignalHandler is a simple callback function that gets called when any signal is received +// MobileSignalHandler is a simple callback function that gets called when any signal is received type MobileSignalHandler func([]byte) // storing the current mobile signal handler here @@ -64,7 +64,7 @@ func send(signalType string, event interface{}) { // SetMobileSignalHandler setup geth callback to notify about new signal // used for gomobile builds -//nolint +// nolint func SetMobileSignalHandler(handler SignalHandler) { mobileSignalHandler = func(data []byte) { if len(data) > 0 { @@ -73,6 +73,8 @@ func SetMobileSignalHandler(handler SignalHandler) { } } +// SetEventCallback is to set a callback in order to receive application +// signals which are used to react to asynchronous events in waku. func SetEventCallback(cb unsafe.Pointer) { C.SetEventCallback(cb) } diff --git a/waku/node.go b/waku/node.go index f779fb45..688f0302 100644 --- a/waku/node.go +++ b/waku/node.go @@ -17,6 +17,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/pbnjay/memory" + "github.com/waku-org/go-waku/waku/persistence/sqlite" wmetrics "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/rendezvous" @@ -43,7 +44,6 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/filter" diff --git a/waku/persistence/postgres/migrations/migrate.go b/waku/persistence/migrate/migrate.go similarity index 82% rename from waku/persistence/postgres/migrations/migrate.go rename to waku/persistence/migrate/migrate.go index 427e73bf..e90dcea2 100644 --- a/waku/persistence/postgres/migrations/migrate.go +++ b/waku/persistence/migrate/migrate.go @@ -1,4 +1,4 @@ -package migrations +package migrate import ( "database/sql" @@ -10,10 +10,10 @@ import ( ) // Migrate applies migrations. -func Migrate(db *sql.DB, driver database.Driver) error { +func Migrate(db *sql.DB, driver database.Driver, assetNames []string, assetFunc bindata.AssetFunc) error { return migrateDB(db, bindata.Resource( - AssetNames(), - Asset, + assetNames, + assetFunc, ), driver) } diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index 5f7fda6c..f87ff141 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -6,88 +6,12 @@ import ( "github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/database/pgx" - _ "github.com/jackc/pgx/v5/stdlib" + _ "github.com/jackc/pgx/v5/stdlib" // Blank import to register the postgres driver "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/migrate" "github.com/waku-org/go-waku/waku/persistence/postgres/migrations" ) -// Queries are the postgresql queries for a given table. -type Queries struct { - deleteQuery string - existsQuery string - getQuery string - putQuery string - queryQuery string - prefixQuery string - limitQuery string - offsetQuery string - getSizeQuery string -} - -// NewQueries creates a new Postgresql set of queries for the passed table -func NewQueries(tbl string, db *sql.DB) (*Queries, error) { - err := CreateTable(db, tbl) - if err != nil { - return nil, err - } - return &Queries{ - deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), - existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), - getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), - putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), - queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), - prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, - limitQuery: ` LIMIT %d`, - offsetQuery: ` OFFSET %d`, - getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl), - }, nil -} - -// Delete returns the query for deleting a row. -func (q Queries) Delete() string { - return q.deleteQuery -} - -// Exists returns the query for determining if a row exists. -func (q Queries) Exists() string { - return q.existsQuery -} - -// Get returns the query for getting a row. -func (q Queries) Get() string { - return q.getQuery -} - -// Put returns the query for putting a row. -func (q Queries) Put() string { - return q.putQuery -} - -// Query returns the query for getting multiple rows. -func (q Queries) Query() string { - return q.queryQuery -} - -// Prefix returns the query fragment for getting a rows with a key prefix. -func (q Queries) Prefix() string { - return q.prefixQuery -} - -// Limit returns the query fragment for limiting results. -func (q Queries) Limit() string { - return q.limitQuery -} - -// Offset returns the query fragment for returning rows from a given offset. -func (q Queries) Offset() string { - return q.offsetQuery -} - -// GetSize returns the query for determining the size of a value. -func (q Queries) GetSize() string { - return q.getSizeQuery -} - // WithDB is a DBOption that lets you use a postgresql DBStore and run migrations func WithDB(dburl string, migrate bool) persistence.DBOption { return func(d *persistence.DBStore) error { @@ -127,6 +51,15 @@ func migrationDriver(db *sql.DB) (database.Driver, error) { }) } +// Migrate is the function used for DB migration with postgres driver +func Migrate(db *sql.DB) error { + migrationDriver, err := migrationDriver(db) + if err != nil { + return err + } + return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset) +} + // CreateTable creates the table that will persist the peers func CreateTable(db *sql.DB, tableName string) error { sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName) @@ -137,10 +70,11 @@ func CreateTable(db *sql.DB, tableName string) error { return nil } -func Migrate(db *sql.DB) error { - migrationDriver, err := migrationDriver(db) +// NewQueries creates a new SQL set of queries for the passed table +func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) { + err := CreateTable(db, tbl) if err != nil { - return err + return nil, err } - return migrations.Migrate(db, migrationDriver) + return persistence.CreateQueries(tbl, db), nil } diff --git a/waku/persistence/sql_queries.go b/waku/persistence/sql_queries.go new file mode 100644 index 00000000..2329187e --- /dev/null +++ b/waku/persistence/sql_queries.go @@ -0,0 +1,80 @@ +package persistence + +import ( + "database/sql" + "fmt" +) + +// Queries are the SQL queries for a given table. +type Queries struct { + deleteQuery string + existsQuery string + getQuery string + putQuery string + queryQuery string + prefixQuery string + limitQuery string + offsetQuery string + getSizeQuery string +} + +// CreateQueries Function creates a set of queries for an SQL table. +// Note: Do not use this function to create queries for a table, rather use .NewQueries to create table as well as queries. +func CreateQueries(tbl string, db *sql.DB) *Queries { + return &Queries{ + deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), + existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), + getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), + putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), + queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), + prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, + limitQuery: ` LIMIT %d`, + offsetQuery: ` OFFSET %d`, + getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl), + } +} + +// Delete returns the query for deleting a row. +func (q Queries) Delete() string { + return q.deleteQuery +} + +// Exists returns the query for determining if a row exists. +func (q Queries) Exists() string { + return q.existsQuery +} + +// Get returns the query for getting a row. +func (q Queries) Get() string { + return q.getQuery +} + +// Put returns the query for putting a row. +func (q Queries) Put() string { + return q.putQuery +} + +// Query returns the query for getting multiple rows. +func (q Queries) Query() string { + return q.queryQuery +} + +// Prefix returns the query fragment for getting a rows with a key prefix. +func (q Queries) Prefix() string { + return q.prefixQuery +} + +// Limit returns the query fragment for limiting results. +func (q Queries) Limit() string { + return q.limitQuery +} + +// Offset returns the query fragment for returning rows from a given offset. +func (q Queries) Offset() string { + return q.offsetQuery +} + +// GetSize returns the query for determining the size of a value. +func (q Queries) GetSize() string { + return q.getSizeQuery +} diff --git a/waku/persistence/sqlite/migrations/migrate.go b/waku/persistence/sqlite/migrations/migrate.go deleted file mode 100644 index 427e73bf..00000000 --- a/waku/persistence/sqlite/migrations/migrate.go +++ /dev/null @@ -1,40 +0,0 @@ -package migrations - -import ( - "database/sql" - - "github.com/golang-migrate/migrate/v4" - "github.com/golang-migrate/migrate/v4/database" - - bindata "github.com/golang-migrate/migrate/v4/source/go_bindata" -) - -// Migrate applies migrations. -func Migrate(db *sql.DB, driver database.Driver) error { - return migrateDB(db, bindata.Resource( - AssetNames(), - Asset, - ), driver) -} - -// Migrate database using provided resources. -func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error { - source, err := bindata.WithInstance(resources) - if err != nil { - return err - } - - m, err := migrate.NewWithInstance( - "go-bindata", - source, - "gowakudb", - driver) - if err != nil { - return err - } - - if err = m.Up(); err != migrate.ErrNoChange { - return err - } - return nil -} diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index a0ef8243..947e9d56 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -9,86 +9,10 @@ import ( "github.com/golang-migrate/migrate/v4/database/sqlite3" _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/migrate" "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" ) -// Queries are the sqlite queries for a given table. -type Queries struct { - deleteQuery string - existsQuery string - getQuery string - putQuery string - queryQuery string - prefixQuery string - limitQuery string - offsetQuery string - getSizeQuery string -} - -// NewQueries creates a new SQLite set of queries for the passed table -func NewQueries(tbl string, db *sql.DB) (*Queries, error) { - err := CreateTable(db, tbl) - if err != nil { - return nil, err - } - return &Queries{ - deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), - existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), - getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), - putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), - queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), - prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, - limitQuery: ` LIMIT %d`, - offsetQuery: ` OFFSET %d`, - getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl), - }, nil -} - -// Delete returns the query for deleting a row. -func (q Queries) Delete() string { - return q.deleteQuery -} - -// Exists returns the query for determining if a row exists. -func (q Queries) Exists() string { - return q.existsQuery -} - -// Get returns the query for getting a row. -func (q Queries) Get() string { - return q.getQuery -} - -// Put returns the query for putting a row. -func (q Queries) Put() string { - return q.putQuery -} - -// Query returns the query for getting multiple rows. -func (q Queries) Query() string { - return q.queryQuery -} - -// Prefix returns the query fragment for getting a rows with a key prefix. -func (q Queries) Prefix() string { - return q.prefixQuery -} - -// Limit returns the query fragment for limiting results. -func (q Queries) Limit() string { - return q.limitQuery -} - -// Offset returns the query fragment for returning rows from a given offset. -func (q Queries) Offset() string { - return q.offsetQuery -} - -// GetSize returns the query for determining the size of a value. -func (q Queries) GetSize() string { - return q.getSizeQuery -} - func addSqliteURLDefaults(dburl string) string { if !strings.Contains(dburl, "?") { dburl += "?" @@ -144,9 +68,24 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { return db, Migrate, nil } +func migrationDriver(db *sql.DB) (database.Driver, error) { + return sqlite3.WithInstance(db, &sqlite3.Config{ + MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, + }) +} + +// Migrate is the function used for DB migration with sqlite driver +func Migrate(db *sql.DB) error { + migrationDriver, err := migrationDriver(db) + if err != nil { + return err + } + return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset) +} + // CreateTable creates the table that will persist the peers func CreateTable(db *sql.DB, tableName string) error { - sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName) + sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BLOB);", tableName) _, err := db.Exec(sqlStmt) if err != nil { return err @@ -154,16 +93,11 @@ func CreateTable(db *sql.DB, tableName string) error { return nil } -func migrationDriver(db *sql.DB) (database.Driver, error) { - return sqlite3.WithInstance(db, &sqlite3.Config{ - MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, - }) -} - -func Migrate(db *sql.DB) error { - migrationDriver, err := migrationDriver(db) +// NewQueries creates a table if it doesn't exist and a new SQL set of queries for the passed table +func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) { + err := CreateTable(db, tbl) if err != nil { - return err + return nil, err } - return migrations.Migrate(db, migrationDriver) + return persistence.CreateQueries(tbl, db), nil } diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 339cdbee..f9332db3 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" ) +// MessageProvider is an interface that provides access to store/retrieve messages from a persistence store. type MessageProvider interface { GetAll() ([]StoredMessage, error) Validate(env *protocol.Envelope) error @@ -29,8 +30,13 @@ type MessageProvider interface { Stop() } +// ErrInvalidCursor indicates that an invalid cursor has been passed to access store var ErrInvalidCursor = errors.New("invalid cursor") + +// ErrFutureMessage indicates that a message with timestamp in future was requested to be stored var ErrFutureMessage = errors.New("message timestamp in the future") + +// ErrMessageTooOld indicates that a message that was too old was requested to be stored. var ErrMessageTooOld = errors.New("message too old") // WALMode for sqlite. @@ -58,6 +64,7 @@ type DBStore struct { cancel context.CancelFunc } +// StoredMessage is the format of the message stored in persistence store type StoredMessage struct { ID []byte PubsubTopic string @@ -76,6 +83,7 @@ func WithDB(db *sql.DB) DBOption { } } +// ConnectionPoolOptions is the options to be used for DB connection pooling type ConnectionPoolOptions struct { MaxOpenConnections int MaxIdleConnections int @@ -123,6 +131,7 @@ func WithMigrations(migrationFn func(db *sql.DB) error) DBOption { } } +// DefaultOptions returns the default DBoptions to be used. func DefaultOptions() []DBOption { return []DBOption{} } @@ -154,6 +163,7 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { return result, nil } +// Start starts the store server functionality func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error { ctx, cancel := context.WithCancel(ctx) @@ -172,17 +182,17 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e return nil } -func (store *DBStore) updateMetrics(ctx context.Context) { +func (d *DBStore) updateMetrics(ctx context.Context) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - defer store.wg.Done() + defer d.wg.Done() for { select { case <-ticker.C: - msgCount, err := store.Count() + msgCount, err := d.Count() if err != nil { - store.log.Error("updating store metrics", zap.Error(err)) + d.log.Error("updating store metrics", zap.Error(err)) } else { metrics.RecordArchiveMessage(ctx, "stored", msgCount) } @@ -256,6 +266,7 @@ func (d *DBStore) Stop() { d.db.Close() } +// Validate validates the message to be stored against possible fradulent conditions. func (d *DBStore) Validate(env *protocol.Envelope) error { n := time.Unix(0, env.Index().ReceiverTime) upperBound := n.Add(MaxTimeVariance) @@ -300,21 +311,65 @@ func (d *DBStore) Put(env *protocol.Envelope) error { return nil } -// Query retrieves messages from the DB -func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error) { - start := time.Now() - defer func() { - elapsed := time.Since(start) - d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed)) - }() +func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, conditions []string, parameters []interface{}) ([]string, []interface{}, error) { + usesCursor := false + if query.PagingInfo.Cursor != nil { + usesCursor = true + var exists bool + cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) + err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)", + cursorDBKey.Bytes(), + ).Scan(&exists) + + if err != nil { + return nil, nil, err + } + + if exists { + eqOp := ">" + if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { + eqOp = "<" + } + *paramCnt++ + conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt)) + + parameters = append(parameters, cursorDBKey.Bytes()) + } else { + return nil, nil, ErrInvalidCursor + } + } + + handleTimeParam := func(time int64, op string) { + *paramCnt++ + conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt)) + timeDBKey := NewDBKey(uint64(time), uint64(time), "", []byte{}) + parameters = append(parameters, timeDBKey.Bytes()) + } + + if query.StartTime != 0 { + if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { + handleTimeParam(query.StartTime, ">=") + } + } + + if query.EndTime != 0 { + if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD { + handleTimeParam(query.EndTime, "<=") + } + } + return conditions, parameters, nil +} + +func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) { sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version - FROM message - %s - ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s ` + FROM message + %s + ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s ` var conditions []string - var parameters []interface{} + //var parameters []interface{} + parameters := make([]interface{}, 0) //Allocating as a slice so that references get passed rather than value paramCnt := 0 if query.PubsubTopic != "" { @@ -335,53 +390,10 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err conditions = append(conditions, "contentTopic IN ("+strings.Join(ctPlaceHolder, ", ")+")") } - usesCursor := false - if query.PagingInfo.Cursor != nil { - usesCursor = true - var exists bool - cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) - - err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)", - cursorDBKey.Bytes(), - ).Scan(&exists) - - if err != nil { - return nil, nil, err - } - - if exists { - eqOp := ">" - if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { - eqOp = "<" - } - paramCnt++ - conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, paramCnt)) - - parameters = append(parameters, cursorDBKey.Bytes()) - } else { - return nil, nil, ErrInvalidCursor - } + conditions, parameters, err := d.handleQueryCursor(query, ¶mCnt, conditions, parameters) + if err != nil { + return "", nil, err } - - if query.StartTime != 0 { - if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { - paramCnt++ - conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt)) - startTimeDBKey := NewDBKey(uint64(query.StartTime), uint64(query.StartTime), "", []byte{}) - parameters = append(parameters, startTimeDBKey.Bytes()) - } - - } - - if query.EndTime != 0 { - if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD { - paramCnt++ - conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt)) - endTimeDBKey := NewDBKey(uint64(query.EndTime), uint64(query.EndTime), "", []byte{}) - parameters = append(parameters, endTimeDBKey.Bytes()) - } - } - conditionStr := "" if len(conditions) != 0 { conditionStr = "WHERE " + strings.Join(conditions, " AND ") @@ -393,18 +405,36 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err } paramCnt++ + sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt) sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection) + d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery)) + return sqlQuery, parameters, nil + +} + +// Query retrieves messages from the DB +func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error) { + start := time.Now() + defer func() { + elapsed := time.Since(start) + d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed)) + }() + + sqlQuery, parameters, err := d.prepareQuerySQL(query) + if err != nil { + return nil, nil, err + } stmt, err := d.db.Prepare(sqlQuery) if err != nil { return nil, nil, err } defer stmt.Close() - pageSize := query.PagingInfo.PageSize + 1 parameters = append(parameters, pageSize) + measurementStart := time.Now() rows, err := stmt.Query(parameters...) if err != nil { diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 08dceb6b..0300fb30 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -10,7 +10,8 @@ import ( _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" + "github.com/waku-org/go-waku/waku/persistence/migrate" + sqlitemigrations "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -24,7 +25,7 @@ func Migrate(db *sql.DB) error { if err != nil { return err } - return migrations.Migrate(db, migrationDriver) + return migrate.Migrate(db, migrationDriver, sqlitemigrations.AssetNames(), sqlitemigrations.Asset) } func NewMock() *sql.DB {