2021-04-13 18:52:57 +00:00
package persistence
2021-04-12 17:59:41 +00:00
import (
"database/sql"
2022-05-30 18:48:22 +00:00
"errors"
"fmt"
"strings"
"sync"
2021-11-05 14:27:30 +00:00
"time"
2021-04-12 17:59:41 +00:00
2022-05-30 19:13:27 +00:00
"github.com/status-im/go-waku/waku/persistence/migrations"
2022-05-30 18:48:22 +00:00
"github.com/status-im/go-waku/waku/v2/protocol"
2021-04-22 00:09:37 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/pb"
2021-11-05 14:27:30 +00:00
"github.com/status-im/go-waku/waku/v2/utils"
2022-01-18 18:17:06 +00:00
"go.uber.org/zap"
2021-04-12 17:59:41 +00:00
)
2021-10-25 19:41:08 +00:00
type MessageProvider interface {
GetAll ( ) ( [ ] StoredMessage , error )
2022-05-30 18:48:22 +00:00
Put ( env * protocol . Envelope ) error
Query ( query * pb . HistoryQuery ) ( [ ] StoredMessage , error )
MostRecentTimestamp ( ) ( int64 , error )
2021-10-25 19:41:08 +00:00
Stop ( )
}
2022-05-30 18:48:22 +00:00
var ErrInvalidCursor = errors . New ( "invalid cursor" )
// WALMode for sqlite.
const WALMode = "wal"
2021-04-22 18:49:52 +00:00
// DBStore is a MessageProvider that has a *sql.DB connection
2021-04-12 17:59:41 +00:00
type DBStore struct {
2021-10-25 19:41:08 +00:00
MessageProvider
2022-01-18 18:17:06 +00:00
db * sql . DB
2022-05-30 15:55:30 +00:00
log * zap . Logger
2021-11-05 14:27:30 +00:00
maxMessages int
2021-11-06 13:23:58 +00:00
maxDuration time . Duration
2022-05-30 18:48:22 +00:00
wg sync . WaitGroup
quit chan struct { }
2021-04-12 17:59:41 +00:00
}
2021-10-25 19:41:08 +00:00
type StoredMessage struct {
ID [ ] byte
PubsubTopic string
2022-02-23 15:01:53 +00:00
ReceiverTime int64
2021-10-25 19:41:08 +00:00
Message * pb . WakuMessage
}
2021-10-09 18:18:53 +00:00
// DBOption is an optional setting that can be used to configure the DBStore
2021-04-13 18:52:57 +00:00
type DBOption func ( * DBStore ) error
2021-04-22 18:49:52 +00:00
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
2021-04-13 18:52:57 +00:00
func WithDB ( db * sql . DB ) DBOption {
return func ( d * DBStore ) error {
d . db = db
return nil
}
}
2021-04-22 18:49:52 +00:00
// WithDriver is a DBOption that will open a *sql.DB connection
2021-04-13 18:52:57 +00:00
func WithDriver ( driverName string , datasourceName string ) DBOption {
return func ( d * DBStore ) error {
db , err := sql . Open ( driverName , datasourceName )
if err != nil {
return err
}
d . db = db
return nil
}
}
2021-11-06 13:23:58 +00:00
func WithRetentionPolicy ( maxMessages int , maxDuration time . Duration ) DBOption {
2021-11-05 14:27:30 +00:00
return func ( d * DBStore ) error {
2021-11-06 13:23:58 +00:00
d . maxDuration = maxDuration
2021-11-05 14:27:30 +00:00
d . maxMessages = maxMessages
return nil
}
}
2021-04-22 18:49:52 +00:00
// Creates a new DB store using the db specified via options.
2021-11-06 13:23:58 +00:00
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
2022-05-30 15:55:30 +00:00
func NewDBStore ( log * zap . Logger , options ... DBOption ) ( * DBStore , error ) {
2021-04-13 18:52:57 +00:00
result := new ( DBStore )
2022-01-18 18:17:06 +00:00
result . log = log . Named ( "dbstore" )
2022-05-30 18:48:22 +00:00
result . quit = make ( chan struct { } )
2021-04-13 18:52:57 +00:00
2021-11-05 14:27:30 +00:00
for _ , opt := range options {
err := opt ( result )
if err != nil {
return nil , err
}
}
2022-05-30 18:48:22 +00:00
// Disable concurrent access as not supported by the driver
result . db . SetMaxOpenConns ( 1 )
var seq string
var name string
var file string // file will be empty if DB is :memory"
err := result . db . QueryRow ( "PRAGMA database_list" ) . Scan ( & seq , & name , & file )
if err != nil {
return nil , err
}
// readers do not block writers and faster i/o operations
// https://www.sqlite.org/draft/wal.html
// must be set after db is encrypted
var mode string
err = result . db . QueryRow ( "PRAGMA journal_mode=WAL" ) . Scan ( & mode )
if err != nil {
return nil , err
}
if mode != WALMode && file != "" {
return nil , fmt . Errorf ( "unable to set journal_mode to WAL. actual mode %s" , mode )
}
2022-05-30 19:13:27 +00:00
err = migrations . Migrate ( result . db )
2021-04-12 17:59:41 +00:00
if err != nil {
return nil , err
}
2021-11-05 14:27:30 +00:00
err = result . cleanOlderRecords ( )
2021-04-12 17:59:41 +00:00
if err != nil {
return nil , err
}
2022-05-30 18:48:22 +00:00
result . wg . Add ( 1 )
go result . checkForOlderRecords ( 10 * time . Second ) // is 10s okay?
2021-04-12 17:59:41 +00:00
return result , nil
}
2021-11-05 14:27:30 +00:00
func ( d * DBStore ) cleanOlderRecords ( ) error {
2022-05-19 21:29:15 +00:00
d . log . Debug ( "Cleaning older records..." )
2021-11-06 13:23:58 +00:00
// Delete older messages
if d . maxDuration > 0 {
2022-05-19 21:29:15 +00:00
start := time . Now ( )
2021-11-05 14:27:30 +00:00
sqlStmt := ` DELETE FROM message WHERE receiverTimestamp < ? `
2021-11-06 13:50:38 +00:00
_ , err := d . db . Exec ( sqlStmt , utils . GetUnixEpochFrom ( time . Now ( ) . Add ( - d . maxDuration ) ) )
2021-11-05 14:27:30 +00:00
if err != nil {
return err
}
2022-05-19 21:29:15 +00:00
elapsed := time . Since ( start )
2022-05-30 15:55:30 +00:00
d . log . Debug ( "deleting older records from the DB" , zap . Duration ( "duration" , elapsed ) )
2021-11-05 14:27:30 +00:00
}
// Limit number of records to a max N
if d . maxMessages > 0 {
2022-05-19 21:29:15 +00:00
start := time . Now ( )
2022-05-19 20:30:41 +00:00
sqlStmt := ` DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?) `
2021-11-05 14:27:30 +00:00
_ , err := d . db . Exec ( sqlStmt , d . maxMessages )
if err != nil {
return err
}
2022-05-19 21:29:15 +00:00
elapsed := time . Since ( start )
2022-05-30 15:55:30 +00:00
d . log . Debug ( "deleting excess records from the DB" , zap . Duration ( "duration" , elapsed ) )
2022-05-05 18:11:23 +00:00
}
2021-11-05 14:27:30 +00:00
return nil
}
2022-05-30 18:48:22 +00:00
func ( d * DBStore ) checkForOlderRecords ( t time . Duration ) {
defer d . wg . Done ( )
ticker := time . NewTicker ( t )
defer ticker . Stop ( )
for {
select {
case <- d . quit :
return
case <- ticker . C :
2022-05-27 19:55:35 +00:00
err := d . cleanOlderRecords ( )
if err != nil {
d . log . Error ( "cleaning older records" , zap . Error ( err ) )
}
2022-05-30 18:48:22 +00:00
}
}
}
2021-04-22 18:49:52 +00:00
// Closes a DB connection
2021-04-12 17:59:41 +00:00
func ( d * DBStore ) Stop ( ) {
2022-05-30 18:48:22 +00:00
d . quit <- struct { } { }
d . wg . Wait ( )
2021-04-12 17:59:41 +00:00
d . db . Close ( )
}
2021-04-22 18:49:52 +00:00
// Inserts a WakuMessage into the DB
2022-05-30 18:48:22 +00:00
func ( d * DBStore ) Put ( env * protocol . Envelope ) error {
2021-07-29 15:03:30 +00:00
stmt , err := d . db . Prepare ( "INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)" )
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
2022-05-30 18:48:22 +00:00
cursor := env . Index ( )
dbKey := NewDBKey ( uint64 ( cursor . SenderTime ) , env . PubsubTopic ( ) , env . Index ( ) . Digest )
_ , err = stmt . Exec ( dbKey . Bytes ( ) , cursor . ReceiverTime , env . Message ( ) . Timestamp , env . Message ( ) . ContentTopic , env . PubsubTopic ( ) , env . Message ( ) . Payload , env . Message ( ) . Version )
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
2022-05-27 18:34:13 +00:00
err = stmt . Close ( )
if err != nil {
return err
}
2021-04-12 17:59:41 +00:00
return nil
}
2022-05-30 18:48:22 +00:00
func ( d * DBStore ) Query ( query * pb . HistoryQuery ) ( [ ] StoredMessage , error ) {
start := time . Now ( )
defer func ( ) {
elapsed := time . Since ( start )
d . log . Info ( fmt . Sprintf ( "Loading records from the DB took %s" , elapsed ) )
} ( )
sqlQuery := ` SELECT id , receiverTimestamp , senderTimestamp , contentTopic , pubsubTopic , payload , version
FROM message
% s
ORDER BY senderTimestamp % s , pubsubTopic , id
LIMIT ? `
var conditions [ ] string
var parameters [ ] interface { }
if query . PubsubTopic != "" {
conditions = append ( conditions , "pubsubTopic = ?" )
parameters = append ( parameters , query . PubsubTopic )
}
if query . StartTime != 0 {
conditions = append ( conditions , "id >= ?" )
startTimeDBKey := NewDBKey ( uint64 ( query . StartTime ) , "" , [ ] byte { } )
parameters = append ( parameters , startTimeDBKey . Bytes ( ) )
}
if query . EndTime != 0 {
conditions = append ( conditions , "id <= ?" )
endTimeDBKey := NewDBKey ( uint64 ( query . EndTime ) , "" , [ ] byte { } )
parameters = append ( parameters , endTimeDBKey . Bytes ( ) )
}
if len ( query . ContentFilters ) != 0 {
var ctPlaceHolder [ ] string
for _ , ct := range query . ContentFilters {
if ct . ContentTopic != "" {
ctPlaceHolder = append ( ctPlaceHolder , "?" )
parameters = append ( parameters , ct . ContentTopic )
}
}
conditions = append ( conditions , "contentTopic IN (" + strings . Join ( ctPlaceHolder , ", " ) + ")" )
}
if query . PagingInfo . Cursor != nil {
var exists bool
cursorDBKey := NewDBKey ( uint64 ( query . PagingInfo . Cursor . SenderTime ) , query . PagingInfo . Cursor . PubsubTopic , query . PagingInfo . Cursor . Digest )
err := d . db . QueryRow ( "SELECT EXISTS(SELECT 1 FROM message WHERE id = ?)" ,
cursorDBKey . Bytes ( ) ,
) . Scan ( & exists )
if err != nil {
return nil , err
}
if exists {
eqOp := ">"
if query . PagingInfo . Direction == pb . PagingInfo_BACKWARD {
eqOp = "<"
}
conditions = append ( conditions , fmt . Sprintf ( "id %s ?" , eqOp ) )
parameters = append ( parameters , cursorDBKey . Bytes ( ) )
} else {
return nil , ErrInvalidCursor
}
}
conditionStr := ""
if len ( conditions ) != 0 {
conditionStr = "WHERE " + strings . Join ( conditions , " AND " )
}
orderDirection := "ASC"
if query . PagingInfo . Direction == pb . PagingInfo_BACKWARD {
orderDirection = "DESC"
}
sqlQuery = fmt . Sprintf ( sqlQuery , conditionStr , orderDirection )
stmt , err := d . db . Prepare ( sqlQuery )
if err != nil {
return nil , err
}
defer stmt . Close ( )
parameters = append ( parameters , query . PagingInfo . PageSize )
rows , err := stmt . Query ( parameters ... )
if err != nil {
return nil , err
}
var result [ ] StoredMessage
for rows . Next ( ) {
record , err := d . GetStoredMessage ( rows )
if err != nil {
return nil , err
}
result = append ( result , record )
}
defer rows . Close ( )
return result , nil
}
func ( d * DBStore ) MostRecentTimestamp ( ) ( int64 , error ) {
result := sql . NullInt64 { }
err := d . db . QueryRow ( ` SELECT max(senderTimestamp) FROM message ` ) . Scan ( & result )
if err != nil && err != sql . ErrNoRows {
return 0 , err
}
return result . Int64 , nil
}
2021-04-22 18:49:52 +00:00
// Returns all the stored WakuMessages
2021-10-25 19:41:08 +00:00
func ( d * DBStore ) GetAll ( ) ( [ ] StoredMessage , error ) {
2022-05-19 20:30:41 +00:00
start := time . Now ( )
defer func ( ) {
elapsed := time . Since ( start )
2022-05-30 15:55:30 +00:00
d . log . Info ( "loading records from the DB" , zap . Duration ( "duration" , elapsed ) )
2022-05-19 20:30:41 +00:00
} ( )
2021-07-29 15:03:30 +00:00
rows , err := d . db . Query ( "SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC" )
2021-04-12 17:59:41 +00:00
if err != nil {
return nil , err
}
2021-10-25 19:41:08 +00:00
var result [ ] StoredMessage
2021-04-12 17:59:41 +00:00
defer rows . Close ( )
for rows . Next ( ) {
2022-05-30 18:48:22 +00:00
record , err := d . GetStoredMessage ( rows )
2021-04-12 17:59:41 +00:00
if err != nil {
2022-05-30 18:48:22 +00:00
return nil , err
2021-07-11 18:11:38 +00:00
}
result = append ( result , record )
2021-04-12 17:59:41 +00:00
}
2022-05-30 15:55:30 +00:00
d . log . Info ( "DB returned records" , zap . Int ( "count" , len ( result ) ) )
2022-05-19 20:30:41 +00:00
2021-04-12 17:59:41 +00:00
err = rows . Err ( )
if err != nil {
return nil , err
}
return result , nil
}
2022-05-30 18:48:22 +00:00
func ( d * DBStore ) GetStoredMessage ( rows * sql . Rows ) ( StoredMessage , error ) {
var id [ ] byte
var receiverTimestamp int64
var senderTimestamp int64
var contentTopic string
var payload [ ] byte
var version uint32
var pubsubTopic string
err := rows . Scan ( & id , & receiverTimestamp , & senderTimestamp , & contentTopic , & pubsubTopic , & payload , & version )
if err != nil {
d . log . Error ( "scanning messages from db" , zap . Error ( err ) )
return StoredMessage { } , err
}
msg := new ( pb . WakuMessage )
msg . ContentTopic = contentTopic
msg . Payload = payload
msg . Timestamp = senderTimestamp
msg . Version = version
record := StoredMessage {
ID : id ,
PubsubTopic : pubsubTopic ,
ReceiverTime : receiverTimestamp ,
Message : msg ,
}
return record , nil
}