go-waku/waku/persistence/store.go

413 lines
10 KiB
Go
Raw Normal View History

2021-04-13 18:52:57 +00:00
package persistence
2021-04-12 17:59:41 +00:00
import (
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
2021-04-12 17:59:41 +00:00
2022-05-30 19:13:27 +00:00
"github.com/status-im/go-waku/waku/persistence/migrations"
"github.com/status-im/go-waku/waku/v2/protocol"
2021-04-22 00:09:37 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"go.uber.org/zap"
2021-04-12 17:59:41 +00:00
)
2021-10-25 19:41:08 +00:00
type MessageProvider interface {
GetAll() ([]StoredMessage, error)
Put(env *protocol.Envelope) error
Query(query *pb.HistoryQuery) ([]StoredMessage, error)
MostRecentTimestamp() (int64, error)
2021-10-25 19:41:08 +00:00
Stop()
}
var ErrInvalidCursor = errors.New("invalid cursor")
// WALMode for sqlite.
const WALMode = "wal"
2021-04-22 18:49:52 +00:00
// DBStore is a MessageProvider that has a *sql.DB connection
2021-04-12 17:59:41 +00:00
type DBStore struct {
2021-10-25 19:41:08 +00:00
MessageProvider
db *sql.DB
log *zap.Logger
maxMessages int
maxDuration time.Duration
wg sync.WaitGroup
quit chan struct{}
2021-04-12 17:59:41 +00:00
}
2021-10-25 19:41:08 +00:00
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime int64
2021-10-25 19:41:08 +00:00
Message *pb.WakuMessage
}
2021-10-09 18:18:53 +00:00
// DBOption is an optional setting that can be used to configure the DBStore
2021-04-13 18:52:57 +00:00
type DBOption func(*DBStore) error
2021-04-22 18:49:52 +00:00
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
2021-04-13 18:52:57 +00:00
func WithDB(db *sql.DB) DBOption {
return func(d *DBStore) error {
d.db = db
return nil
}
}
2021-04-22 18:49:52 +00:00
// WithDriver is a DBOption that will open a *sql.DB connection
2021-04-13 18:52:57 +00:00
func WithDriver(driverName string, datasourceName string) DBOption {
return func(d *DBStore) error {
db, err := sql.Open(driverName, datasourceName)
if err != nil {
return err
}
d.db = db
return nil
}
}
2022-07-25 15:28:17 +00:00
// WithRetentionPolicy is a DBOption that specifies the max number of messages
// to be stored and duration before they're removed from the message store
func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
return func(d *DBStore) error {
d.maxDuration = maxDuration
d.maxMessages = maxMessages
return nil
}
}
2021-04-22 18:49:52 +00:00
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
2021-04-13 18:52:57 +00:00
result := new(DBStore)
result.log = log.Named("dbstore")
result.quit = make(chan struct{})
2021-04-13 18:52:57 +00:00
for _, opt := range options {
err := opt(result)
if err != nil {
return nil, err
}
}
// Disable concurrent access as not supported by the driver
result.db.SetMaxOpenConns(1)
var seq string
var name string
var file string // file will be empty if DB is :memory"
err := result.db.QueryRow("PRAGMA database_list").Scan(&seq, &name, &file)
if err != nil {
return nil, err
}
// readers do not block writers and faster i/o operations
// https://www.sqlite.org/draft/wal.html
// must be set after db is encrypted
var mode string
err = result.db.QueryRow("PRAGMA journal_mode=WAL").Scan(&mode)
if err != nil {
return nil, err
}
if mode != WALMode && file != "" {
return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode)
}
2022-05-30 19:13:27 +00:00
err = migrations.Migrate(result.db)
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
err = result.cleanOlderRecords()
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
result.wg.Add(1)
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
2021-04-12 17:59:41 +00:00
return result, nil
}
func (d *DBStore) cleanOlderRecords() error {
d.log.Debug("Cleaning older records...")
// Delete older messages
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
2021-11-06 13:50:38 +00:00
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
if err != nil {
return err
}
elapsed := time.Since(start)
d.log.Debug("deleting older records from the DB", zap.Duration("duration", elapsed))
}
// Limit number of records to a max N
if d.maxMessages > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?)`
_, err := d.db.Exec(sqlStmt, d.maxMessages)
if err != nil {
return err
}
elapsed := time.Since(start)
d.log.Debug("deleting excess records from the DB", zap.Duration("duration", elapsed))
}
return nil
}
func (d *DBStore) checkForOlderRecords(t time.Duration) {
defer d.wg.Done()
ticker := time.NewTicker(t)
defer ticker.Stop()
for {
select {
case <-d.quit:
return
case <-ticker.C:
2022-05-27 19:55:35 +00:00
err := d.cleanOlderRecords()
if err != nil {
d.log.Error("cleaning older records", zap.Error(err))
}
}
}
}
2022-07-25 15:28:17 +00:00
// Stop closes a DB connection
2021-04-12 17:59:41 +00:00
func (d *DBStore) Stop() {
d.quit <- struct{}{}
d.wg.Wait()
2021-04-12 17:59:41 +00:00
d.db.Close()
}
2022-07-25 15:28:17 +00:00
// Put inserts a WakuMessage into the DB
func (d *DBStore) Put(env *protocol.Envelope) error {
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)")
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
cursor := env.Index()
dbKey := NewDBKey(uint64(cursor.SenderTime), env.PubsubTopic(), env.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version)
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
2022-05-27 18:34:13 +00:00
err = stmt.Close()
if err != nil {
return err
}
2021-04-12 17:59:41 +00:00
return nil
}
2022-07-25 15:28:17 +00:00
// Query retrieves messages from the DB
func (d *DBStore) Query(query *pb.HistoryQuery) ([]StoredMessage, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed))
}()
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
FROM message
%s
2022-06-14 15:36:34 +00:00
ORDER BY senderTimestamp %s, pubsubTopic, id %s
LIMIT ?`
var conditions []string
var parameters []interface{}
if query.PubsubTopic != "" {
conditions = append(conditions, "pubsubTopic = ?")
parameters = append(parameters, query.PubsubTopic)
}
if query.StartTime != 0 {
conditions = append(conditions, "id >= ?")
startTimeDBKey := NewDBKey(uint64(query.StartTime), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
if query.EndTime != 0 {
conditions = append(conditions, "id <= ?")
endTimeDBKey := NewDBKey(uint64(query.EndTime), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
if len(query.ContentFilters) != 0 {
var ctPlaceHolder []string
for _, ct := range query.ContentFilters {
if ct.ContentTopic != "" {
ctPlaceHolder = append(ctPlaceHolder, "?")
parameters = append(parameters, ct.ContentTopic)
}
}
conditions = append(conditions, "contentTopic IN ("+strings.Join(ctPlaceHolder, ", ")+")")
}
if query.PagingInfo.Cursor != nil {
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)",
cursorDBKey.Bytes(),
).Scan(&exists)
if err != nil {
return nil, err
}
if exists {
eqOp := ">"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
eqOp = "<"
}
conditions = append(conditions, fmt.Sprintf("id %s ?", eqOp))
parameters = append(parameters, cursorDBKey.Bytes())
} else {
return nil, ErrInvalidCursor
}
}
conditionStr := ""
if len(conditions) != 0 {
conditionStr = "WHERE " + strings.Join(conditions, " AND ")
}
orderDirection := "ASC"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
orderDirection = "DESC"
}
2022-06-14 15:36:34 +00:00
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection)
stmt, err := d.db.Prepare(sqlQuery)
if err != nil {
return nil, err
}
defer stmt.Close()
parameters = append(parameters, query.PagingInfo.PageSize)
rows, err := stmt.Query(parameters...)
if err != nil {
return nil, err
}
var result []StoredMessage
for rows.Next() {
record, err := d.GetStoredMessage(rows)
if err != nil {
return nil, err
}
result = append(result, record)
}
defer rows.Close()
return result, nil
}
2022-07-25 15:28:17 +00:00
// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp
// in the message table
func (d *DBStore) MostRecentTimestamp() (int64, error) {
result := sql.NullInt64{}
err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM message`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return result.Int64, nil
}
2022-07-28 19:17:12 +00:00
// Count returns the number of rows in the message table
func (d *DBStore) Count() (int, error) {
var result int
err := d.db.QueryRow(`SELECT COUNT(*) FROM message`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return result, nil
}
2022-07-25 15:28:17 +00:00
// GetAll returns all the stored WakuMessages
2021-10-25 19:41:08 +00:00
func (d *DBStore) GetAll() ([]StoredMessage, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
d.log.Info("loading records from the DB", zap.Duration("duration", elapsed))
}()
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
2021-10-25 19:41:08 +00:00
var result []StoredMessage
2021-04-12 17:59:41 +00:00
defer rows.Close()
for rows.Next() {
record, err := d.GetStoredMessage(rows)
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
result = append(result, record)
2021-04-12 17:59:41 +00:00
}
d.log.Info("DB returned records", zap.Int("count", len(result)))
2021-04-12 17:59:41 +00:00
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}
2022-07-25 15:28:17 +00:00
// GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`
func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
var id []byte
var receiverTimestamp int64
var senderTimestamp int64
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
2022-07-25 15:28:17 +00:00
err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil {
d.log.Error("scanning messages from db", zap.Error(err))
return StoredMessage{}, err
}
msg := new(pb.WakuMessage)
msg.ContentTopic = contentTopic
msg.Payload = payload
msg.Timestamp = senderTimestamp
msg.Version = version
record := StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
Message: msg,
}
return record, nil
}