go-waku/waku/persistence/store.go

197 lines
4.4 KiB
Go
Raw Normal View History

2021-04-13 18:52:57 +00:00
package persistence
2021-04-12 17:59:41 +00:00
import (
"database/sql"
"log"
"time"
2021-04-12 17:59:41 +00:00
2021-04-22 00:09:37 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
2021-04-12 17:59:41 +00:00
)
2021-10-25 19:41:08 +00:00
type MessageProvider interface {
GetAll() ([]StoredMessage, error)
Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error
Stop()
}
2021-04-22 18:49:52 +00:00
// DBStore is a MessageProvider that has a *sql.DB connection
2021-04-12 17:59:41 +00:00
type DBStore struct {
2021-10-25 19:41:08 +00:00
MessageProvider
2021-04-12 17:59:41 +00:00
db *sql.DB
maxMessages int
maxDuration time.Duration
2021-04-12 17:59:41 +00:00
}
2021-10-25 19:41:08 +00:00
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
Message *pb.WakuMessage
}
2021-10-09 18:18:53 +00:00
// DBOption is an optional setting that can be used to configure the DBStore
2021-04-13 18:52:57 +00:00
type DBOption func(*DBStore) error
2021-04-22 18:49:52 +00:00
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
2021-04-13 18:52:57 +00:00
func WithDB(db *sql.DB) DBOption {
return func(d *DBStore) error {
d.db = db
return nil
}
}
2021-04-22 18:49:52 +00:00
// WithDriver is a DBOption that will open a *sql.DB connection
2021-04-13 18:52:57 +00:00
func WithDriver(driverName string, datasourceName string) DBOption {
return func(d *DBStore) error {
db, err := sql.Open(driverName, datasourceName)
if err != nil {
return err
}
d.db = db
return nil
}
}
func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
return func(d *DBStore) error {
d.maxDuration = maxDuration
d.maxMessages = maxMessages
return nil
}
}
2021-04-22 18:49:52 +00:00
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewDBStore(options ...DBOption) (*DBStore, error) {
2021-04-13 18:52:57 +00:00
result := new(DBStore)
for _, opt := range options {
err := opt(result)
if err != nil {
return nil, err
}
}
err := result.createTable()
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
err = result.cleanOlderRecords()
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
return result, nil
}
func (d *DBStore) createTable() error {
sqlStmt := `CREATE TABLE IF NOT EXISTS message (
2021-04-12 17:59:41 +00:00
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
senderTimestamp REAL NOT NULL,
2021-04-12 17:59:41 +00:00
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
2021-04-12 17:59:41 +00:00
payload BLOB,
version INTEGER NOT NULL DEFAULT 0
) WITHOUT ROWID;`
_, err := d.db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}
func (d *DBStore) cleanOlderRecords() error {
// Delete older messages
if d.maxDuration > 0 {
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
2021-11-06 13:50:38 +00:00
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
if err != nil {
return err
}
}
// Limit number of records to a max N
if d.maxMessages > 0 {
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)`
_, err := d.db.Exec(sqlStmt, d.maxMessages)
if err != nil {
return err
}
}
return nil
}
2021-04-22 18:49:52 +00:00
// Closes a DB connection
2021-04-12 17:59:41 +00:00
func (d *DBStore) Stop() {
d.db.Close()
}
2021-04-22 18:49:52 +00:00
// Inserts a WakuMessage into the DB
func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error {
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)")
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
_, err = stmt.Exec(cursor.Digest, cursor.ReceiverTime, message.Timestamp, message.ContentTopic, pubsubTopic, message.Payload, message.Version)
2021-04-12 17:59:41 +00:00
if err != nil {
return err
}
return nil
}
2021-04-22 18:49:52 +00:00
// Returns all the stored WakuMessages
2021-10-25 19:41:08 +00:00
func (d *DBStore) GetAll() ([]StoredMessage, error) {
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
2021-04-12 17:59:41 +00:00
if err != nil {
return nil, err
}
2021-10-25 19:41:08 +00:00
var result []StoredMessage
2021-04-12 17:59:41 +00:00
defer rows.Close()
for rows.Next() {
var id []byte
var receiverTimestamp float64
var senderTimestamp float64
2021-04-12 17:59:41 +00:00
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
2021-04-12 17:59:41 +00:00
err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
2021-04-12 17:59:41 +00:00
if err != nil {
log.Fatal(err)
}
2021-04-22 00:09:37 +00:00
msg := new(pb.WakuMessage)
2021-04-12 17:59:41 +00:00
msg.ContentTopic = contentTopic
msg.Payload = payload
msg.Timestamp = senderTimestamp
2021-04-12 17:59:41 +00:00
msg.Version = version
2021-10-25 19:41:08 +00:00
record := StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
Message: msg,
}
result = append(result, record)
2021-04-12 17:59:41 +00:00
}
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}