chore: use nwaku's db format (#965)

This commit is contained in:
richΛrd 2024-01-03 12:49:54 -04:00 committed by GitHub
parent a327e56377
commit 846183d515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 385 additions and 114 deletions

View File

@ -109,7 +109,7 @@ func Execute(options NodeOptions) error {
logger := utils.Logger().With(logging.HostID("node", id))
var db *sql.DB
var migrationFn func(*sql.DB) error
var migrationFn func(*sql.DB, *zap.Logger) error
if requiresDB(options) && options.Store.Migration {
dbSettings := dbutils.DBSettings{}
db, migrationFn, err = dbutils.ParseURL(options.Store.DatabaseURL, dbSettings, logger)

View File

@ -13,6 +13,7 @@ import (
"time"
"unsafe"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -202,7 +203,7 @@ func NewNode(instance *WakuInstance, configJSON string) error {
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error
var migrationFn func(*sql.DB, *zap.Logger) error
db, migrationFn, err = dbutils.ParseURL(*config.DatabaseURL, dbutils.DBSettings{}, utils.Logger())
if err != nil {
return err

View File

@ -1,56 +0,0 @@
package persistence
import (
"encoding/binary"
"errors"
"github.com/waku-org/go-waku/waku/v2/hash"
)
const (
TimestampLength = 8
HashLength = 32
DigestLength = HashLength
PubsubTopicLength = HashLength
DBKeyLength = TimestampLength + PubsubTopicLength + DigestLength
)
type Hash [HashLength]byte
var (
// ErrInvalidByteSize is returned when DBKey can't be created
// from a byte slice because it has invalid length.
ErrInvalidByteSize = errors.New("byte slice has invalid length")
)
// DBKey key to be stored in a db.
type DBKey struct {
raw []byte
}
// Bytes returns a bytes representation of the DBKey.
func (k *DBKey) Bytes() []byte {
return k.raw
}
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey {
pubSubHash := make([]byte, PubsubTopicLength)
if pubsubTopic != "" {
pubSubHash = hash.SHA256([]byte(pubsubTopic))
}
var k DBKey
k.raw = make([]byte, DBKeyLength)
if senderTimestamp == 0 {
binary.BigEndian.PutUint64(k.raw, receiverTimestamp)
} else {
binary.BigEndian.PutUint64(k.raw, senderTimestamp)
}
copy(k.raw[TimestampLength:], pubSubHash[:])
copy(k.raw[TimestampLength+PubsubTopicLength:], digest)
return &k
}

View File

@ -14,11 +14,19 @@ func Migrate(db *sql.DB, driver database.Driver, assetNames []string, assetFunc
return migrateDB(db, bindata.Resource(
assetNames,
assetFunc,
), driver)
), driver, true)
}
// Migrate applies migrations.
func MigrateDown(db *sql.DB, driver database.Driver, assetNames []string, assetFunc bindata.AssetFunc) error {
return migrateDB(db, bindata.Resource(
assetNames,
assetFunc,
), driver, false)
}
// Migrate database using provided resources.
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error {
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver, up bool) error {
source, err := bindata.WithInstance(resources)
if err != nil {
return err
@ -33,7 +41,12 @@ func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Drive
return err
}
if err = m.Up(); err != migrate.ErrNoChange {
fn := m.Up
if !up {
fn = m.Down
}
if err = fn(); err != migrate.ErrNoChange {
return err
}
return nil

View File

@ -8,6 +8,8 @@
// 3_rendezvous.up.sql (181B)
// 4_signed_peer_record.down.sql (166B)
// 4_signed_peer_record.up.sql (178B)
// 5_nwaku_schema.down.sql (891B)
// 5_nwaku_schema.up.sql (838B)
// doc.go (74B)
package migrations
@ -237,6 +239,46 @@ func _4_signed_peer_recordUpSql() (*asset, error) {
return a, nil
}
var __5_nwaku_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x53\x4f\x8f\xba\x30\x10\x3d\xd3\x4f\x31\x47\x49\x7a\xf8\xfd\xf6\xea\xa9\xca\x68\xc8\x62\x31\xb5\x26\xeb\x89\xa0\x34\xa6\x89\x80\x4b\xab\x89\xdf\x7e\x83\xf5\x0f\x14\xdd\x3d\x41\xde\x1b\xde\x9b\x79\xc3\xb0\x44\xa2\x00\xc9\x26\x09\x42\xa9\x8c\xc9\xf7\x0a\x04\x72\xb6\x40\x90\xe9\x1d\xc9\xea\x43\x31\x26\x91\x48\x97\x10\xf3\x08\xbf\x40\x67\xd6\x78\xc0\xf7\x49\x35\x97\x31\x21\x53\x81\x4c\xa2\xa7\x38\x22\x81\x2e\x60\xb2\x91\xc8\x28\x09\x1a\xb5\x53\xfa\xac\x1a\xa9\x4b\x65\x6c\x5e\x1e\x61\x12\xcf\x63\x2e\x81\xa7\x12\xf8\x3a\x49\x28\x09\x8c\xaa\x8a\xdf\x2b\x76\x75\x65\x55\x65\x65\x7d\xd4\x3b\x27\xdd\x65\x8f\xa7\xad\x39\x6d\xdf\x91\xf9\xe5\x50\xe7\xcf\x86\xce\xaa\x31\xba\xae\x20\xe6\x12\xe7\x28\x1e\xa5\x10\xe1\x8c\xad\x13\x09\xff\x28\x09\x96\x22\x5e\x30\xb1\x81\x4f\xdc\xc0\x48\x17\x14\x3a\x16\x21\x09\x9f\xa3\xbb\x44\xee\xd1\xf9\x83\xa4\xfc\x4e\x8d\x3c\x2a\x1c\xbf\x56\x18\xc6\xd5\xd1\x18\x90\xbe\x8a\xce\x4a\xb3\xcf\xfe\x77\xbf\xe9\x25\xc7\x56\xd3\xde\x28\x0e\xf0\xbb\xbe\x82\xba\x68\x9f\xaf\x0d\x3e\xde\x1a\x44\xe8\x3b\x38\xc4\xb7\x70\xa8\x2e\xae\x2f\x6d\x9c\x31\x5f\xa1\x90\xed\x52\x1e\x3f\xe2\x35\xf7\xc1\xc8\x03\x2d\x0a\xdd\x0e\x7a\xe6\x14\x6e\xcb\xa7\x70\xdb\x7a\x48\x56\x98\xe0\x54\x42\xab\x6d\x6c\xdd\xa8\x82\x59\x0a\x76\x28\x96\xfe\x2d\x46\x66\x22\x5d\xf4\xcf\xc6\x9d\x49\xef\x1e\x1c\xf1\x13\x00\x00\xff\xff\xdf\x9a\x72\x0e\x7b\x03\x00\x00")
func _5_nwaku_schemaDownSqlBytes() ([]byte, error) {
return bindataRead(
__5_nwaku_schemaDownSql,
"5_nwaku_schema.down.sql",
)
}
func _5_nwaku_schemaDownSql() (*asset, error) {
bytes, err := _5_nwaku_schemaDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "5_nwaku_schema.down.sql", size: 891, mode: os.FileMode(0664), modTime: time.Unix(1704298105, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x41, 0xe2, 0x4, 0x10, 0x40, 0x1d, 0x4b, 0xcc, 0x5b, 0x7b, 0x36, 0x78, 0x23, 0x8c, 0x7, 0xdc, 0x51, 0x63, 0xe9, 0x9e, 0x19, 0x78, 0xe2, 0x22, 0xee, 0x51, 0xa8, 0x26, 0x85, 0x48, 0x41, 0xb2}}
return a, nil
}
var __5_nwaku_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x93\x41\x8f\xda\x30\x10\x85\xef\xfe\x15\xef\x08\x92\x57\x6a\x7b\xe5\x64\x16\xef\x36\x6a\x70\x56\x5e\xaf\x54\x4e\xc8\xe0\xd1\x62\x29\xc4\x69\xec\x80\xf8\xf7\x55\xc4\x06\x02\x0d\x87\x1e\x72\x99\x79\x79\x33\xf9\xe6\x45\xe4\x46\x6a\x18\x31\xcf\x25\xf6\x14\xa3\xfd\x24\x68\xa9\xc4\x52\xc2\x14\x7d\x65\x1d\x4a\x37\x63\x6c\xa1\x8b\x37\x64\x6a\x21\x7f\x5f\x1a\x91\x2a\x47\x8d\xf1\x7b\x8a\xc9\xee\xeb\xd9\x98\xa6\xa1\x2d\xf9\xc3\x23\x95\x5f\xef\xe3\xe7\xfa\xfb\x48\xed\xc7\x8c\xb1\x67\x2d\x85\x91\x77\xfb\x4d\x18\x50\xb7\x9b\xd8\x6e\x4c\xa8\xfd\x16\xf3\x95\x91\x02\xaa\x30\x50\x1f\x79\xce\x19\xb0\x0d\x55\xa2\x2a\x3d\x6a\xd7\xf6\x54\x06\xeb\xce\x9d\xae\x70\xa0\x26\xfa\x50\x21\x53\x46\xbe\x4a\x7d\x11\x63\x21\x5f\xc4\x47\x6e\xf0\xad\x53\xa5\xfe\x03\x30\xcf\x5e\x33\x65\x6e\x3c\xfd\xc0\xee\x6b\xd1\x9f\x36\xee\xbe\x8a\x78\x7a\x82\xa2\x63\x79\x82\x75\x8e\x1c\x47\xda\xf9\x88\xa3\x2f\x4b\x6c\x08\x75\xa8\xdb\xd2\x26\x72\x38\xfa\xb4\x83\xc5\x36\xb4\x55\xa2\x06\x07\x5b\xb6\xc4\x80\x98\x42\x43\x4e\xa4\xb1\xc1\x6f\x3a\x5b\x0a\xbd\xc2\x2f\xb9\xc2\x64\x30\x79\xca\xa6\x57\x80\x3d\xd7\x14\x51\x28\x2c\x7b\x90\xbd\xef\x74\x76\x2f\xfc\xd3\x52\x73\xba\xd1\x0e\x99\xf2\x21\x7f\x7e\x59\x8f\xc3\xbb\x6e\x68\xa6\xde\xa5\x36\x1d\xcd\x4b\x84\x26\x37\x2f\xdc\x79\x9d\xcf\xc1\xfb\x33\xf0\x2b\xe9\xce\x91\x0f\x79\x5e\x87\x4d\xd9\xbb\xcc\xe5\xb3\xc1\x7f\x39\xdf\x05\xf6\xec\xdf\x3d\xff\xa4\x94\xbd\xe8\x62\x39\xf6\x07\xdc\x84\xf1\xdc\xf8\x1b\x00\x00\xff\xff\xf2\x0c\xa4\xf2\x46\x03\x00\x00")
func _5_nwaku_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
__5_nwaku_schemaUpSql,
"5_nwaku_schema.up.sql",
)
}
func _5_nwaku_schemaUpSql() (*asset, error) {
bytes, err := _5_nwaku_schemaUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "5_nwaku_schema.up.sql", size: 838, mode: os.FileMode(0664), modTime: time.Unix(1704298834, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6a, 0x3b, 0xf4, 0xf4, 0xf8, 0xa1, 0x9e, 0xa9, 0x6b, 0x6e, 0x24, 0xe3, 0x2b, 0x26, 0xce, 0x87, 0x2d, 0xdb, 0xea, 0x64, 0x62, 0x99, 0x87, 0xdc, 0x1b, 0x88, 0xb9, 0x35, 0x5f, 0xf7, 0x6c, 0x6e}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -364,6 +406,10 @@ var _bindata = map[string]func() (*asset, error){
"4_signed_peer_record.up.sql": _4_signed_peer_recordUpSql,
"5_nwaku_schema.down.sql": _5_nwaku_schemaDownSql,
"5_nwaku_schema.up.sql": _5_nwaku_schemaUpSql,
"doc.go": docGo,
}
@ -416,6 +462,8 @@ var _bintree = &bintree{nil, map[string]*bintree{
"3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}},
"4_signed_peer_record.down.sql": &bintree{_4_signed_peer_recordDownSql, map[string]*bintree{}},
"4_signed_peer_record.up.sql": &bintree{_4_signed_peer_recordUpSql, map[string]*bintree{}},
"5_nwaku_schema.down.sql": &bintree{_5_nwaku_schemaDownSql, map[string]*bintree{}},
"5_nwaku_schema.up.sql": &bintree{_5_nwaku_schemaUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,25 @@
ALTER TABLE message RENAME TO message_old;
DROP INDEX i_ts;
DROP INDEX i_query;
CREATE TABLE message (
id BYTEA,
receiverTimestamp BIGINT NOT NULL,
senderTimestamp BIGINT NOT NULL,
contentTopic BYTEA NOT NULL,
pubsubTopic BYTEA NOT NULL,
payload BYTEA,
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id, pubsubTopic)
);
CREATE INDEX message_senderTimestamp ON message(senderTimestamp);
CREATE INDEX message_receiverTimestamp ON message(receiverTimestamp);
CREATE INDEX i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC);
CREATE INDEX i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC);
INSERT INTO message(id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version)
SELECT id, storedAt, timestamp, contentTOpic, pubsubTopic, payload, version
FROM message_old;
DROP TABLE message_old;

View File

@ -0,0 +1,27 @@
ALTER TABLE message RENAME TO message_old;
DROP INDEX message_senderTimestamp;
DROP INDEX message_receiverTimestamp;
DROP INDEX i_msg_1;
DROP INDEX i_msg_2;
CREATE TABLE message (
pubsubTopic BYTEA NOT NULL,
contentTopic BYTEA NOT NULL,
payload BYTEA,
version INTEGER NOT NULL DEFAULT 0,
timestamp BIGINT NOT NULL,
id BYTEA,
messageHash BYTEA, -- Newly added, this will be populated with a counter value
storedAt BIGINT NOT NULL,
PRIMARY KEY (messageHash)
);
CREATE INDEX i_ts ON Message (storedAt);
CREATE INDEX i_query ON Message (contentTopic, pubsubTopic, storedAt, id);
INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, id, receiverTimestamp
FROM message_old;
DROP TABLE message_old;

View File

@ -25,12 +25,12 @@ func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) {
func migrationDriver(db *sql.DB) (database.Driver, error) {
return pgx.WithInstance(db, &pgx.Config{
MigrationsTable: "gowaku_" + pgx.DefaultMigrationsTable,
MigrationsTable: pgx.DefaultMigrationsTable,
})
}
// Migrations is the function used for DB migration with postgres driver
func Migrations(db *sql.DB) error {
func Migrations(db *sql.DB, logger *zap.Logger) error {
migrationDriver, err := migrationDriver(db)
if err != nil {
return err

View File

@ -8,6 +8,8 @@
// 3_rendezvous.up.sql (204B)
// 4_signed_peer_record.down.sql (186B)
// 4_signed_peer_record.up.sql (197B)
// 5_nwaku_schema.down.sql (927B)
// 5_nwaku_schema.up.sql (862B)
// doc.go (74B)
package migrations
@ -237,6 +239,46 @@ func _4_signed_peer_recordUpSql() (*asset, error) {
return a, nil
}
var __5_nwaku_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x53\x41\xcf\xa2\x30\x10\x3d\xd3\x5f\x31\x47\x49\x7a\xd8\xdd\x2b\xa7\x0a\xfd\x76\x9b\xc5\xf6\x4b\xa9\x71\x3d\x11\xb4\x8d\x69\x22\xe0\x52\x34\xeb\xbf\xdf\x08\xe2\x07\x45\xe3\x09\xf2\x1e\xbc\x37\xf3\x66\x86\xa4\x8a\x4a\x50\x64\x99\x52\x28\x8d\x73\xc5\xc1\x80\xa4\x9c\xac\x28\x28\x31\x20\x79\x7d\xd4\x11\x4a\xa4\xf8\x04\xc6\x13\xfa\x07\x6c\xde\x3a\x0f\xf8\x7b\x36\xcd\x35\x42\x28\x96\x94\x28\xea\x29\x2e\x50\x60\x35\x2c\x53\xb1\xc4\x28\x68\xcc\xde\xd8\x8b\x69\x94\x2d\x8d\x6b\x8b\xf2\x04\x8c\x2b\xfa\x93\x4a\xe0\x42\x01\x5f\xa7\x29\x46\x81\x33\x95\x7e\xf3\xc9\xbe\xae\x5a\x53\xb5\xaa\x3e\xd9\x7d\xa7\x3d\x26\x4f\xe7\x9d\x3b\xef\x5e\x70\xc5\xf5\x58\x17\x8f\x7a\x2e\xa6\x71\xb6\xae\x66\x16\x90\xd0\x0f\xb2\x4e\x15\x7c\xc3\x28\x88\x05\xcf\x94\x24\x8c\xab\xa1\x29\x56\x69\xf3\x0f\x3e\x25\x5b\x11\xb9\x85\xdf\x74\x0b\x0b\xab\x31\x8c\x8c\x43\x14\xc2\x86\xa9\x5f\x62\xad\x40\x8a\x0d\x4b\xbe\xe2\xe9\x53\x1b\xe2\xf5\x9b\x15\x7c\xa0\x16\x1e\x15\x46\xcf\x15\xe6\x99\x8e\x34\x66\xa4\xaf\x62\xf3\xd2\x1d\xf2\xef\xe3\x7f\x26\xe1\x92\x2c\x9e\x34\xd6\x03\x7e\xd5\x1d\x68\xf5\xed\xf9\xdc\xe0\xc7\x4b\x83\x84\xfa\x0e\x3d\xe2\x5b\xf4\xa8\xd5\xdd\x4b\x18\x21\xc4\x78\x46\xa5\xba\x8d\xee\xb1\xac\xdd\x14\x66\x2d\xcf\xb4\x30\x8c\x2b\x98\x98\x63\xb8\x6f\x08\x86\xfb\x6e\x84\x28\xa3\x29\x8d\x15\xdc\xb4\x5d\x5b\x37\x46\x93\x16\x43\x3b\x17\x13\xef\xc5\xd0\x87\x14\xab\xe9\x69\xf5\xa7\x34\xb9\x99\x9e\xf8\x1f\x00\x00\xff\xff\xa4\x05\xfd\x0a\x9f\x03\x00\x00")
func _5_nwaku_schemaDownSqlBytes() ([]byte, error) {
return bindataRead(
__5_nwaku_schemaDownSql,
"5_nwaku_schema.down.sql",
)
}
func _5_nwaku_schemaDownSql() (*asset, error) {
bytes, err := _5_nwaku_schemaDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "5_nwaku_schema.down.sql", size: 927, mode: os.FileMode(0664), modTime: time.Unix(1702924493, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0x6e, 0xcc, 0x65, 0x3c, 0x21, 0x2a, 0x77, 0x18, 0x7b, 0xe9, 0x5d, 0x97, 0x8e, 0x87, 0xc8, 0xa6, 0x3b, 0x1a, 0x4d, 0xe6, 0xb1, 0x26, 0x95, 0xdb, 0x84, 0xcc, 0xbb, 0x4d, 0x76, 0x70, 0xc7}}
return a, nil
}
var __5_nwaku_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x53\xc1\x8e\xda\x30\x10\xbd\xfb\x2b\xde\x91\x48\xd9\x43\x7b\xcd\x29\x80\xdb\x8d\x1a\xec\x95\xf1\x6a\xbb\x27\x64\xf0\x68\xb1\x14\x92\x34\x76\xa0\xfc\x7d\x45\xa9\x81\xb0\xe1\xd0\x43\x0e\x99\xf7\xf2\x66\xe6\xcd\x4b\x5e\x6a\xae\xa0\xf3\x69\xc9\xb1\x23\xef\xcd\x07\x41\x71\x91\x2f\x38\xb4\x8c\x95\x55\x53\xd9\x8c\xb1\xb9\x92\x2f\x28\xc4\x9c\xff\xbc\x00\x9e\x6a\x4b\x9d\x76\x3b\xf2\xc1\xec\xda\x6c\x8c\xd3\xd1\x86\xdc\xfe\x11\xcb\xad\x76\xfe\x63\xf5\x65\xa4\xf6\x35\x63\x6c\xa6\x78\xae\xf9\xdd\x7c\x13\x06\xb4\xfd\xda\xf7\x6b\xdd\xb4\x6e\x83\x69\x29\xa7\x10\x52\x43\xbc\x96\x65\xca\x80\x4d\x53\x07\xaa\xc3\x03\xb4\x35\xc7\xaa\x31\xf6\x2f\x70\x7a\xdf\x53\xe7\x5d\x53\xa3\x10\x9a\x7f\xe7\x6a\xc0\x0d\x71\xe6\x51\xd4\x5d\x45\xfe\xcd\xf6\x6c\xfc\xf6\x5c\xc3\xd3\x13\x04\x1d\xaa\x23\x8c\xb5\x64\x53\x84\xad\xf3\x38\xb8\xaa\xc2\x9a\xd0\x36\x6d\x5f\x99\x40\x16\x07\x17\xb6\x30\xd8\x34\x7d\x1d\xa8\xc3\xde\x54\x3d\x31\xc0\x87\xa6\x23\x9b\x87\xd1\xbe\x33\x29\x96\x5a\xe5\x85\xd0\xb1\x6f\x51\x5b\xfa\x8d\x17\x55\x2c\x72\xf5\x8e\x1f\xfc\x1d\x93\x9b\x89\x12\x96\xe0\xad\xd0\xcf\xf2\x55\x43\xc9\xb7\x62\x9e\x45\x63\xa3\xdf\xc1\x43\x0a\x2c\xa2\xc1\xb1\x7b\xf2\x89\xf8\xab\xa7\xee\x38\xe0\xde\x9a\x9d\xde\xde\x25\xbd\x2c\x91\xc2\xd9\x24\x63\xac\x10\x4b\xae\xf4\x69\xa5\x4b\xb4\x26\x83\x0f\xee\xb4\xce\x87\x4a\xe3\x85\xd2\xeb\x39\x4e\x8a\xe9\xad\xe9\xd7\x66\x09\x5b\xf2\x92\xcf\x34\xfe\x4b\xf9\x2e\xc8\x67\xfd\xd3\xf3\x29\xbd\xec\x9b\x92\x8b\xb1\x3f\x63\x10\xd2\x33\xf0\x27\x00\x00\xff\xff\xe5\x34\x0a\x93\x5e\x03\x00\x00")
func _5_nwaku_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
__5_nwaku_schemaUpSql,
"5_nwaku_schema.up.sql",
)
}
func _5_nwaku_schemaUpSql() (*asset, error) {
bytes, err := _5_nwaku_schemaUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "5_nwaku_schema.up.sql", size: 862, mode: os.FileMode(0664), modTime: time.Unix(1702926452, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xc9, 0x53, 0x70, 0xe8, 0x7d, 0xc5, 0x25, 0x14, 0xa4, 0x78, 0x9c, 0x89, 0xe1, 0x57, 0xdc, 0xef, 0x35, 0xa6, 0xab, 0xfb, 0x6d, 0x19, 0x28, 0xd7, 0xd0, 0xe5, 0xd8, 0x7e, 0x1, 0xbf, 0x9e}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -364,6 +406,10 @@ var _bindata = map[string]func() (*asset, error){
"4_signed_peer_record.up.sql": _4_signed_peer_recordUpSql,
"5_nwaku_schema.down.sql": _5_nwaku_schemaDownSql,
"5_nwaku_schema.up.sql": _5_nwaku_schemaUpSql,
"doc.go": docGo,
}
@ -416,6 +462,8 @@ var _bintree = &bintree{nil, map[string]*bintree{
"3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}},
"4_signed_peer_record.down.sql": &bintree{_4_signed_peer_recordDownSql, map[string]*bintree{}},
"4_signed_peer_record.up.sql": &bintree{_4_signed_peer_recordUpSql, map[string]*bintree{}},
"5_nwaku_schema.down.sql": &bintree{_5_nwaku_schemaDownSql, map[string]*bintree{}},
"5_nwaku_schema.up.sql": &bintree{_5_nwaku_schemaUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,25 @@
ALTER TABLE message RENAME TO message_old;
DROP INDEX i_ts;
DROP INDEX i_query;
CREATE TABLE message (
id BLOB,
receiverTimestamp INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL DEFAULT 0,
CONSTRAINT messageIndex PRIMARY KEY (id, pubsubTopic)
) WITHOUT ROWID;
CREATE INDEX message_senderTimestamp ON message(senderTimestamp);
CREATE INDEX message_receiverTimestamp ON message(receiverTimestamp);
CREATE INDEX i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC);
CREATE INDEX i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC);
INSERT INTO message(id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version)
SELECT id, storedAt, timestamp, contentTOpic, pubsubTopic, payload, version
FROM message_old;
DROP TABLE message_old;

View File

@ -0,0 +1,26 @@
ALTER TABLE message RENAME TO message_old;
DROP INDEX message_senderTimestamp;
DROP INDEX message_receiverTimestamp;
DROP INDEX i_msg_1;
DROP INDEX i_msg_2;
CREATE TABLE message (
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
id BLOB,
messageHash BLOB, -- Newly added, this will be populated with a counter value
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
) WITHOUT ROWID;
CREATE INDEX i_ts ON Message (storedAt);
CREATE INDEX i_query ON Message (contentTopic, pubsubTopic, storedAt, id);
INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, id, receiverTimestamp
FROM message_old;
DROP TABLE message_old;

View File

@ -0,0 +1,82 @@
package sqlite
import (
"database/sql"
"errors"
"fmt"
)
const minSupportedNWAKUversion = 8
const maxSupportedNWAKUversion = 8
func handleNWakuPreMigration(db *sql.DB) (bool, error) {
// Check if there's an user version in the DB, and if migration table does not exist.
// Rename existing table, and move data afterwards
var nwakuDBVersion int
err := db.QueryRow("PRAGMA user_version").Scan(&nwakuDBVersion)
if err != nil {
return false, fmt.Errorf("could not obtain sqlite user_version while attempting to migrate nwaku database: %w", err)
}
var gowakuDBVersion int
err = db.QueryRow(`SELECT COUNT(*) FROM schema_migrations`).Scan(&gowakuDBVersion)
if err != nil && errors.Is(err, sql.ErrNoRows) {
return false, fmt.Errorf("could not obtain schema_migrations data while attempting to migrate nwaku database: %w", err)
}
if nwakuDBVersion == 0 {
// not a nwaku db
return false, nil
}
if nwakuDBVersion < minSupportedNWAKUversion || nwakuDBVersion > maxSupportedNWAKUversion {
err = fmt.Errorf("unsupported nwaku DB %d - Supported versions [%d,%d]", nwakuDBVersion, minSupportedNWAKUversion, maxSupportedNWAKUversion)
return false, err
}
if gowakuDBVersion > 0 {
// We have already migrated this database
return false, nil
}
_, err = db.Exec("ALTER TABLE message RENAME TO message_nwaku")
if err != nil {
return false, fmt.Errorf("could not rename nwaku message table while attempting to migrate nwaku database: %w", err)
}
_, err = db.Exec("DROP INDEX i_ts;")
if err != nil {
return false, fmt.Errorf("could not drop indexes while attempting to migrate nwaku database: %w", err)
}
_, err = db.Exec("DROP INDEX i_query;")
if err != nil {
return false, fmt.Errorf("could not drop indexes while attempting to migrate nwaku database: %w", err)
}
return true, nil
}
func handleNWakuPostMigration(db *sql.DB) error {
_, err := db.Exec("INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt FROM message_nwaku")
if err != nil {
return fmt.Errorf("could not migrate nwaku messages: %w", err)
}
_, err = db.Exec("DROP TABLE message_nwaku")
if err != nil {
return fmt.Errorf("could not drop nwaku message table: %w", err)
}
return nil
}
func revertNWakuPreMigration(db *sql.DB) error {
_, err := db.Exec("ALTER TABLE message_nwaku RENAME TO message")
if err != nil {
return fmt.Errorf("could not revert changes to nwaku db: %w", err)
}
return nil
}

View File

@ -45,17 +45,43 @@ func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) {
func migrationDriver(db *sql.DB) (database.Driver, error) {
return sqlite3.WithInstance(db, &sqlite3.Config{
MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable,
MigrationsTable: sqlite3.DefaultMigrationsTable,
})
}
// Migrations is the function used for DB migration with sqlite driver
func Migrations(db *sql.DB) error {
func Migrations(db *sql.DB, logger *zap.Logger) error {
migrationDriver, err := migrationDriver(db)
if err != nil {
return err
}
return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
isNwaku, err := handleNWakuPreMigration(db)
if err != nil {
return err
}
err = migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
if err != nil {
if isNwaku {
migrationErr := migrate.MigrateDown(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
if migrationErr != nil {
logger.Fatal("could not revert table changes in nwaku database", zap.Error(err))
}
revertErr := revertNWakuPreMigration(db)
if revertErr != nil {
logger.Fatal("could not revert table changes in nwaku database", zap.Error(err))
}
}
return err
}
if isNwaku {
return handleNWakuPostMigration(db)
}
return nil
}
// CreateTable creates the table that will persist the peers

View File

@ -49,7 +49,7 @@ type DBStore struct {
MessageProvider
db *sql.DB
migrationFn func(db *sql.DB) error
migrationFn func(db *sql.DB, logger *zap.Logger) error
metrics Metrics
timesource timesource.Timesource
@ -121,7 +121,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
}
}
type MigrationFn func(db *sql.DB) error
type MigrationFn func(db *sql.DB, logger *zap.Logger) error
// WithMigrations is a DBOption used to determine if migrations should
// be executed, and what driver to use
@ -157,7 +157,7 @@ func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption)
}
if result.enableMigrations {
err := result.migrationFn(result.db)
err := result.migrationFn(result.db, log)
if err != nil {
return nil, err
}
@ -211,7 +211,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
// Delete older messages
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1`
sqlStmt := `DELETE FROM message WHERE storedAt < $1`
_, err := d.db.Exec(sqlStmt, d.timesource.Now().Add(-d.maxDuration).UnixNano())
if err != nil {
d.metrics.RecordError(retPolicyFailure)
@ -240,7 +240,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
}
func (d *DBStore) getDeleteOldRowsQuery() string {
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC %s OFFSET $1)`
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY storedAt DESC %s OFFSET $1)`
switch GetDriverType(d.db) {
case SQLiteDriver:
sqlStmt = fmt.Sprintf(sqlStmt, "LIMIT -1")
@ -282,7 +282,12 @@ func (d *DBStore) Stop() {
// Validate validates the message to be stored against possible fradulent conditions.
func (d *DBStore) Validate(env *protocol.Envelope) error {
n := time.Unix(0, env.Index().ReceiverTime)
timestamp := env.Message().GetTimestamp()
if timestamp == 0 {
return nil
}
n := time.Unix(0, timestamp)
upperBound := n.Add(MaxTimeVariance)
lowerBound := n.Add(-MaxTimeVariance)
@ -300,17 +305,20 @@ func (d *DBStore) Validate(env *protocol.Envelope) error {
// Put inserts a WakuMessage into the DB
func (d *DBStore) Put(env *protocol.Envelope) error {
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)")
stmt, err := d.db.Prepare("INSERT INTO message (id, messageHash, storedAt, timestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
if err != nil {
d.metrics.RecordError(insertFailure)
return err
}
cursor := env.Index()
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest)
storedAt := env.Message().GetTimestamp()
if storedAt == 0 {
storedAt = env.Index().ReceiverTime
}
start := time.Now()
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion())
_, err = stmt.Exec(env.Index().Digest, env.Hash(), storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion())
if err != nil {
return err
}
@ -329,36 +337,33 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi
usesCursor := false
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)",
cursorDBKey.Bytes(),
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE storedAt = $1 AND id = $2)",
query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest,
).Scan(&exists)
if err != nil {
return nil, nil, err
}
if exists {
eqOp := ">"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
eqOp = "<"
}
*paramCnt++
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt))
parameters = append(parameters, cursorDBKey.Bytes())
} else {
if !exists {
return nil, nil, ErrInvalidCursor
}
eqOp := ">"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
eqOp = "<"
}
conditions = append(conditions, fmt.Sprintf("(storedAt, id) %s ($%d, $%d)", eqOp, *paramCnt+1, *paramCnt+2))
*paramCnt += 2
parameters = append(parameters, query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest)
}
handleTimeParam := func(time int64, op string) {
*paramCnt++
conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt))
timeDBKey := NewDBKey(uint64(time), 0, "", []byte{})
parameters = append(parameters, timeDBKey.Bytes())
conditions = append(conditions, fmt.Sprintf("storedAt %s $%d", op, *paramCnt))
parameters = append(parameters, time)
}
startTime := query.GetStartTime()
@ -378,10 +383,10 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi
}
func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) {
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
sqlQuery := `SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version
FROM message
%s
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
ORDER BY timestamp %s, id %s, pubsubTopic %s, storedAt %s `
var conditions []string
//var parameters []interface{}
@ -428,7 +433,7 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}
parameters = append(parameters, pageSize)
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery))
d.log.Debug(fmt.Sprintf("sqlQuery: %s", sqlQuery))
return sqlQuery, parameters, nil
@ -490,12 +495,12 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
return cursor, result, nil
}
// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp
// MostRecentTimestamp returns an unix timestamp with the most recent timestamp
// in the message table
func (d *DBStore) MostRecentTimestamp() (int64, error) {
result := sql.NullInt64{}
err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM message`).Scan(&result)
err := d.db.QueryRow(`SELECT max(timestamp) FROM message`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
@ -520,7 +525,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
d.log.Info("loading records from the DB", zap.Duration("duration", elapsed))
}()
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
rows, err := d.db.Query("SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY timestamp ASC")
if err != nil {
return nil, err
}
@ -550,14 +555,14 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
// GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`
func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
var id []byte
var receiverTimestamp int64
var senderTimestamp int64
var storedAt int64
var timestamp int64
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
err := row.Scan(&id, &storedAt, &timestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil {
d.log.Error("scanning messages from db", zap.Error(err))
return StoredMessage{}, err
@ -567,8 +572,8 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
msg.ContentTopic = contentTopic
msg.Payload = payload
if senderTimestamp != 0 {
msg.Timestamp = proto.Int64(senderTimestamp)
if timestamp != 0 {
msg.Timestamp = proto.Int64(timestamp)
}
if version > 0 {
@ -578,7 +583,7 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
record := StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
ReceiverTime: storedAt,
Message: msg,
}

View File

@ -26,9 +26,9 @@ type DBSettings struct {
}
// ParseURL will return a database connection, and migration function that should be used depending on a database connection string
func ParseURL(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
func ParseURL(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB, *zap.Logger) error, error) {
var db *sql.DB
var migrationFn func(*sql.DB) error
var migrationFn func(*sql.DB, *zap.Logger) error
var err error
logger = logger.Named("db-setup")

View File

@ -20,13 +20,14 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
func TestStore(t *testing.T) {
tests := []struct {
name string
fn func(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error)
fn func(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error)
}{
{"testDbStore", testDbStore},
{"testStoreRetention", testStoreRetention},
@ -43,7 +44,7 @@ func TestStore(t *testing.T) {
}
}
func getDB(driver string) (*sql.DB, func(*sql.DB) error) {
func getDB(driver string) (*sql.DB, func(*sql.DB, *zap.Logger) error) {
switch driver {
case "postgres":
return postgres.NewMockPgDB(), postgres.Migrations
@ -52,7 +53,7 @@ func getDB(driver string) (*sql.DB, func(*sql.DB) error) {
}
return nil, nil
}
func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) {
func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) {
store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn))
require.NoError(t, err)
@ -71,7 +72,7 @@ func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) {
require.NotEmpty(t, res)
}
func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) {
func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) {
store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second))
require.NoError(t, err)
@ -112,7 +113,7 @@ func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) erro
require.Equal(t, msgCount, 3)
}
func testQuery(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) {
func testQuery(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) {
store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second))
require.NoError(t, err)

View File

@ -48,7 +48,7 @@ func TestRendezvous(t *testing.T) {
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
err = sqlite.Migrations(db)
err = sqlite.Migrations(db, utils.Logger())
require.NoError(t, err)
rdb := NewDB(db, utils.Logger())