From 846183d515db42ac5338a64311888563efb281a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 3 Jan 2024 12:49:54 -0400 Subject: [PATCH] chore: use nwaku's db format (#965) --- cmd/waku/node.go | 2 +- library/node.go | 3 +- waku/persistence/db_key.go | 56 ------------ waku/persistence/migrate/migrate.go | 19 ++++- .../postgres/migrations/bindata.go | 48 +++++++++++ .../migrations/sql/5_nwaku_schema.down.sql | 25 ++++++ .../migrations/sql/5_nwaku_schema.up.sql | 27 ++++++ waku/persistence/postgres/postgres.go | 4 +- waku/persistence/sqlite/migrations/bindata.go | 48 +++++++++++ .../migrations/sql/5_nwaku_schema.down.sql | 25 ++++++ .../migrations/sql/5_nwaku_schema.up.sql | 26 ++++++ waku/persistence/sqlite/nwaku.go | 82 ++++++++++++++++++ waku/persistence/sqlite/sqlite.go | 32 ++++++- waku/persistence/store.go | 85 ++++++++++--------- waku/persistence/utils/db.go | 4 +- waku/persistence/utils/store_test.go | 11 +-- waku/v2/rendezvous/rendezvous_test.go | 2 +- 17 files changed, 385 insertions(+), 114 deletions(-) delete mode 100644 waku/persistence/db_key.go create mode 100644 waku/persistence/postgres/migrations/sql/5_nwaku_schema.down.sql create mode 100644 waku/persistence/postgres/migrations/sql/5_nwaku_schema.up.sql create mode 100644 waku/persistence/sqlite/migrations/sql/5_nwaku_schema.down.sql create mode 100644 waku/persistence/sqlite/migrations/sql/5_nwaku_schema.up.sql create mode 100644 waku/persistence/sqlite/nwaku.go diff --git a/cmd/waku/node.go b/cmd/waku/node.go index ef725a1c..a56a616c 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -109,7 +109,7 @@ func Execute(options NodeOptions) error { logger := utils.Logger().With(logging.HostID("node", id)) var db *sql.DB - var migrationFn func(*sql.DB) error + var migrationFn func(*sql.DB, *zap.Logger) error if requiresDB(options) && options.Store.Migration { dbSettings := dbutils.DBSettings{} db, migrationFn, err = dbutils.ParseURL(options.Store.DatabaseURL, dbSettings, logger) diff --git a/library/node.go b/library/node.go index 408461bc..34c06931 100644 --- a/library/node.go +++ b/library/node.go @@ -13,6 +13,7 @@ import ( "time" "unsafe" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/ethereum/go-ethereum/common/hexutil" @@ -202,7 +203,7 @@ func NewNode(instance *WakuInstance, configJSON string) error { if *config.EnableStore { var db *sql.DB - var migrationFn func(*sql.DB) error + var migrationFn func(*sql.DB, *zap.Logger) error db, migrationFn, err = dbutils.ParseURL(*config.DatabaseURL, dbutils.DBSettings{}, utils.Logger()) if err != nil { return err diff --git a/waku/persistence/db_key.go b/waku/persistence/db_key.go deleted file mode 100644 index 858fd5bb..00000000 --- a/waku/persistence/db_key.go +++ /dev/null @@ -1,56 +0,0 @@ -package persistence - -import ( - "encoding/binary" - "errors" - - "github.com/waku-org/go-waku/waku/v2/hash" -) - -const ( - TimestampLength = 8 - HashLength = 32 - DigestLength = HashLength - PubsubTopicLength = HashLength - DBKeyLength = TimestampLength + PubsubTopicLength + DigestLength -) - -type Hash [HashLength]byte - -var ( - // ErrInvalidByteSize is returned when DBKey can't be created - // from a byte slice because it has invalid length. - ErrInvalidByteSize = errors.New("byte slice has invalid length") -) - -// DBKey key to be stored in a db. -type DBKey struct { - raw []byte -} - -// Bytes returns a bytes representation of the DBKey. -func (k *DBKey) Bytes() []byte { - return k.raw -} - -// NewDBKey creates a new DBKey with the given values. -func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey { - pubSubHash := make([]byte, PubsubTopicLength) - if pubsubTopic != "" { - pubSubHash = hash.SHA256([]byte(pubsubTopic)) - } - - var k DBKey - k.raw = make([]byte, DBKeyLength) - - if senderTimestamp == 0 { - binary.BigEndian.PutUint64(k.raw, receiverTimestamp) - } else { - binary.BigEndian.PutUint64(k.raw, senderTimestamp) - } - - copy(k.raw[TimestampLength:], pubSubHash[:]) - copy(k.raw[TimestampLength+PubsubTopicLength:], digest) - - return &k -} diff --git a/waku/persistence/migrate/migrate.go b/waku/persistence/migrate/migrate.go index e90dcea2..cc4f7963 100644 --- a/waku/persistence/migrate/migrate.go +++ b/waku/persistence/migrate/migrate.go @@ -14,11 +14,19 @@ func Migrate(db *sql.DB, driver database.Driver, assetNames []string, assetFunc return migrateDB(db, bindata.Resource( assetNames, assetFunc, - ), driver) + ), driver, true) +} + +// Migrate applies migrations. +func MigrateDown(db *sql.DB, driver database.Driver, assetNames []string, assetFunc bindata.AssetFunc) error { + return migrateDB(db, bindata.Resource( + assetNames, + assetFunc, + ), driver, false) } // Migrate database using provided resources. -func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error { +func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver, up bool) error { source, err := bindata.WithInstance(resources) if err != nil { return err @@ -33,7 +41,12 @@ func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Drive return err } - if err = m.Up(); err != migrate.ErrNoChange { + fn := m.Up + if !up { + fn = m.Down + } + + if err = fn(); err != migrate.ErrNoChange { return err } return nil diff --git a/waku/persistence/postgres/migrations/bindata.go b/waku/persistence/postgres/migrations/bindata.go index 9e451d2d..40f23c27 100644 --- a/waku/persistence/postgres/migrations/bindata.go +++ b/waku/persistence/postgres/migrations/bindata.go @@ -8,6 +8,8 @@ // 3_rendezvous.up.sql (181B) // 4_signed_peer_record.down.sql (166B) // 4_signed_peer_record.up.sql (178B) +// 5_nwaku_schema.down.sql (891B) +// 5_nwaku_schema.up.sql (838B) // doc.go (74B) package migrations @@ -237,6 +239,46 @@ func _4_signed_peer_recordUpSql() (*asset, error) { return a, nil } +var __5_nwaku_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x53\x4f\x8f\xba\x30\x10\x3d\xd3\x4f\x31\x47\x49\x7a\xf8\xfd\xf6\xea\xa9\xca\x68\xc8\x62\x31\xb5\x26\xeb\x89\xa0\x34\xa6\x89\x80\x4b\xab\x89\xdf\x7e\x83\xf5\x0f\x14\xdd\x3d\x41\xde\x1b\xde\x9b\x79\xc3\xb0\x44\xa2\x00\xc9\x26\x09\x42\xa9\x8c\xc9\xf7\x0a\x04\x72\xb6\x40\x90\xe9\x1d\xc9\xea\x43\x31\x26\x91\x48\x97\x10\xf3\x08\xbf\x40\x67\xd6\x78\xc0\xf7\x49\x35\x97\x31\x21\x53\x81\x4c\xa2\xa7\x38\x22\x81\x2e\x60\xb2\x91\xc8\x28\x09\x1a\xb5\x53\xfa\xac\x1a\xa9\x4b\x65\x6c\x5e\x1e\x61\x12\xcf\x63\x2e\x81\xa7\x12\xf8\x3a\x49\x28\x09\x8c\xaa\x8a\xdf\x2b\x76\x75\x65\x55\x65\x65\x7d\xd4\x3b\x27\xdd\x65\x8f\xa7\xad\x39\x6d\xdf\x91\xf9\xe5\x50\xe7\xcf\x86\xce\xaa\x31\xba\xae\x20\xe6\x12\xe7\x28\x1e\xa5\x10\xe1\x8c\xad\x13\x09\xff\x28\x09\x96\x22\x5e\x30\xb1\x81\x4f\xdc\xc0\x48\x17\x14\x3a\x16\x21\x09\x9f\xa3\xbb\x44\xee\xd1\xf9\x83\xa4\xfc\x4e\x8d\x3c\x2a\x1c\xbf\x56\x18\xc6\xd5\xd1\x18\x90\xbe\x8a\xce\x4a\xb3\xcf\xfe\x77\xbf\xe9\x25\xc7\x56\xd3\xde\x28\x0e\xf0\xbb\xbe\x82\xba\x68\x9f\xaf\x0d\x3e\xde\x1a\x44\xe8\x3b\x38\xc4\xb7\x70\xa8\x2e\xae\x2f\x6d\x9c\x31\x5f\xa1\x90\xed\x52\x1e\x3f\xe2\x35\xf7\xc1\xc8\x03\x2d\x0a\xdd\x0e\x7a\xe6\x14\x6e\xcb\xa7\x70\xdb\x7a\x48\x56\x98\xe0\x54\x42\xab\x6d\x6c\xdd\xa8\x82\x59\x0a\x76\x28\x96\xfe\x2d\x46\x66\x22\x5d\xf4\xcf\xc6\x9d\x49\xef\x1e\x1c\xf1\x13\x00\x00\xff\xff\xdf\x9a\x72\x0e\x7b\x03\x00\x00") + +func _5_nwaku_schemaDownSqlBytes() ([]byte, error) { + return bindataRead( + __5_nwaku_schemaDownSql, + "5_nwaku_schema.down.sql", + ) +} + +func _5_nwaku_schemaDownSql() (*asset, error) { + bytes, err := _5_nwaku_schemaDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "5_nwaku_schema.down.sql", size: 891, mode: os.FileMode(0664), modTime: time.Unix(1704298105, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x41, 0xe2, 0x4, 0x10, 0x40, 0x1d, 0x4b, 0xcc, 0x5b, 0x7b, 0x36, 0x78, 0x23, 0x8c, 0x7, 0xdc, 0x51, 0x63, 0xe9, 0x9e, 0x19, 0x78, 0xe2, 0x22, 0xee, 0x51, 0xa8, 0x26, 0x85, 0x48, 0x41, 0xb2}} + return a, nil +} + +var __5_nwaku_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x93\x41\x8f\xda\x30\x10\x85\xef\xfe\x15\xef\x08\x92\x57\x6a\x7b\xe5\x64\x16\xef\x36\x6a\x70\x56\x5e\xaf\x54\x4e\xc8\xe0\xd1\x62\x29\xc4\x69\xec\x80\xf8\xf7\x55\xc4\x06\x02\x0d\x87\x1e\x72\x99\x79\x79\x33\xf9\xe6\x45\xe4\x46\x6a\x18\x31\xcf\x25\xf6\x14\xa3\xfd\x24\x68\xa9\xc4\x52\xc2\x14\x7d\x65\x1d\x4a\x37\x63\x6c\xa1\x8b\x37\x64\x6a\x21\x7f\x5f\x1a\x91\x2a\x47\x8d\xf1\x7b\x8a\xc9\xee\xeb\xd9\x98\xa6\xa1\x2d\xf9\xc3\x23\x95\x5f\xef\xe3\xe7\xfa\xfb\x48\xed\xc7\x8c\xb1\x67\x2d\x85\x91\x77\xfb\x4d\x18\x50\xb7\x9b\xd8\x6e\x4c\xa8\xfd\x16\xf3\x95\x91\x02\xaa\x30\x50\x1f\x79\xce\x19\xb0\x0d\x55\xa2\x2a\x3d\x6a\xd7\xf6\x54\x06\xeb\xce\x9d\xae\x70\xa0\x26\xfa\x50\x21\x53\x46\xbe\x4a\x7d\x11\x63\x21\x5f\xc4\x47\x6e\xf0\xad\x53\xa5\xfe\x03\x30\xcf\x5e\x33\x65\x6e\x3c\xfd\xc0\xee\x6b\xd1\x9f\x36\xee\xbe\x8a\x78\x7a\x82\xa2\x63\x79\x82\x75\x8e\x1c\x47\xda\xf9\x88\xa3\x2f\x4b\x6c\x08\x75\xa8\xdb\xd2\x26\x72\x38\xfa\xb4\x83\xc5\x36\xb4\x55\xa2\x06\x07\x5b\xb6\xc4\x80\x98\x42\x43\x4e\xa4\xb1\xc1\x6f\x3a\x5b\x0a\xbd\xc2\x2f\xb9\xc2\x64\x30\x79\xca\xa6\x57\x80\x3d\xd7\x14\x51\x28\x2c\x7b\x90\xbd\xef\x74\x76\x2f\xfc\xd3\x52\x73\xba\xd1\x0e\x99\xf2\x21\x7f\x7e\x59\x8f\xc3\xbb\x6e\x68\xa6\xde\xa5\x36\x1d\xcd\x4b\x84\x26\x37\x2f\xdc\x79\x9d\xcf\xc1\xfb\x33\xf0\x2b\xe9\xce\x91\x0f\x79\x5e\x87\x4d\xd9\xbb\xcc\xe5\xb3\xc1\x7f\x39\xdf\x05\xf6\xec\xdf\x3d\xff\xa4\x94\xbd\xe8\x62\x39\xf6\x07\xdc\x84\xf1\xdc\xf8\x1b\x00\x00\xff\xff\xf2\x0c\xa4\xf2\x46\x03\x00\x00") + +func _5_nwaku_schemaUpSqlBytes() ([]byte, error) { + return bindataRead( + __5_nwaku_schemaUpSql, + "5_nwaku_schema.up.sql", + ) +} + +func _5_nwaku_schemaUpSql() (*asset, error) { + bytes, err := _5_nwaku_schemaUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "5_nwaku_schema.up.sql", size: 838, mode: os.FileMode(0664), modTime: time.Unix(1704298834, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6a, 0x3b, 0xf4, 0xf4, 0xf8, 0xa1, 0x9e, 0xa9, 0x6b, 0x6e, 0x24, 0xe3, 0x2b, 0x26, 0xce, 0x87, 0x2d, 0xdb, 0xea, 0x64, 0x62, 0x99, 0x87, 0xdc, 0x1b, 0x88, 0xb9, 0x35, 0x5f, 0xf7, 0x6c, 0x6e}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -364,6 +406,10 @@ var _bindata = map[string]func() (*asset, error){ "4_signed_peer_record.up.sql": _4_signed_peer_recordUpSql, + "5_nwaku_schema.down.sql": _5_nwaku_schemaDownSql, + + "5_nwaku_schema.up.sql": _5_nwaku_schemaUpSql, + "doc.go": docGo, } @@ -416,6 +462,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}}, "4_signed_peer_record.down.sql": &bintree{_4_signed_peer_recordDownSql, map[string]*bintree{}}, "4_signed_peer_record.up.sql": &bintree{_4_signed_peer_recordUpSql, map[string]*bintree{}}, + "5_nwaku_schema.down.sql": &bintree{_5_nwaku_schemaDownSql, map[string]*bintree{}}, + "5_nwaku_schema.up.sql": &bintree{_5_nwaku_schemaUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/waku/persistence/postgres/migrations/sql/5_nwaku_schema.down.sql b/waku/persistence/postgres/migrations/sql/5_nwaku_schema.down.sql new file mode 100644 index 00000000..2b8f7890 --- /dev/null +++ b/waku/persistence/postgres/migrations/sql/5_nwaku_schema.down.sql @@ -0,0 +1,25 @@ +ALTER TABLE message RENAME TO message_old; +DROP INDEX i_ts; +DROP INDEX i_query; + +CREATE TABLE message ( + id BYTEA, + receiverTimestamp BIGINT NOT NULL, + senderTimestamp BIGINT NOT NULL, + contentTopic BYTEA NOT NULL, + pubsubTopic BYTEA NOT NULL, + payload BYTEA, + version INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (id, pubsubTopic) +); + +CREATE INDEX message_senderTimestamp ON message(senderTimestamp); +CREATE INDEX message_receiverTimestamp ON message(receiverTimestamp); +CREATE INDEX i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC); +CREATE INDEX i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC); + +INSERT INTO message(id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) +SELECT id, storedAt, timestamp, contentTOpic, pubsubTopic, payload, version +FROM message_old; + +DROP TABLE message_old; diff --git a/waku/persistence/postgres/migrations/sql/5_nwaku_schema.up.sql b/waku/persistence/postgres/migrations/sql/5_nwaku_schema.up.sql new file mode 100644 index 00000000..1a81f670 --- /dev/null +++ b/waku/persistence/postgres/migrations/sql/5_nwaku_schema.up.sql @@ -0,0 +1,27 @@ +ALTER TABLE message RENAME TO message_old; + +DROP INDEX message_senderTimestamp; +DROP INDEX message_receiverTimestamp; +DROP INDEX i_msg_1; +DROP INDEX i_msg_2; + +CREATE TABLE message ( + pubsubTopic BYTEA NOT NULL, + contentTopic BYTEA NOT NULL, + payload BYTEA, + version INTEGER NOT NULL DEFAULT 0, + timestamp BIGINT NOT NULL, + id BYTEA, + messageHash BYTEA, -- Newly added, this will be populated with a counter value + storedAt BIGINT NOT NULL, + PRIMARY KEY (messageHash) +); + +CREATE INDEX i_ts ON Message (storedAt); +CREATE INDEX i_query ON Message (contentTopic, pubsubTopic, storedAt, id); + +INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) +SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, id, receiverTimestamp +FROM message_old; + +DROP TABLE message_old; diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index e3ec57bc..13baae9a 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -25,12 +25,12 @@ func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) { func migrationDriver(db *sql.DB) (database.Driver, error) { return pgx.WithInstance(db, &pgx.Config{ - MigrationsTable: "gowaku_" + pgx.DefaultMigrationsTable, + MigrationsTable: pgx.DefaultMigrationsTable, }) } // Migrations is the function used for DB migration with postgres driver -func Migrations(db *sql.DB) error { +func Migrations(db *sql.DB, logger *zap.Logger) error { migrationDriver, err := migrationDriver(db) if err != nil { return err diff --git a/waku/persistence/sqlite/migrations/bindata.go b/waku/persistence/sqlite/migrations/bindata.go index 7ae76db8..a6e8626b 100644 --- a/waku/persistence/sqlite/migrations/bindata.go +++ b/waku/persistence/sqlite/migrations/bindata.go @@ -8,6 +8,8 @@ // 3_rendezvous.up.sql (204B) // 4_signed_peer_record.down.sql (186B) // 4_signed_peer_record.up.sql (197B) +// 5_nwaku_schema.down.sql (927B) +// 5_nwaku_schema.up.sql (862B) // doc.go (74B) package migrations @@ -237,6 +239,46 @@ func _4_signed_peer_recordUpSql() (*asset, error) { return a, nil } +var __5_nwaku_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x53\x41\xcf\xa2\x30\x10\x3d\xd3\x5f\x31\x47\x49\x7a\xd8\xdd\x2b\xa7\x0a\xfd\x76\x9b\xc5\xf6\x4b\xa9\x71\x3d\x11\xb4\x8d\x69\x22\xe0\x52\x34\xeb\xbf\xdf\x08\xe2\x07\x45\xe3\x09\xf2\x1e\xbc\x37\xf3\x66\x86\xa4\x8a\x4a\x50\x64\x99\x52\x28\x8d\x73\xc5\xc1\x80\xa4\x9c\xac\x28\x28\x31\x20\x79\x7d\xd4\x11\x4a\xa4\xf8\x04\xc6\x13\xfa\x07\x6c\xde\x3a\x0f\xf8\x7b\x36\xcd\x35\x42\x28\x96\x94\x28\xea\x29\x2e\x50\x60\x35\x2c\x53\xb1\xc4\x28\x68\xcc\xde\xd8\x8b\x69\x94\x2d\x8d\x6b\x8b\xf2\x04\x8c\x2b\xfa\x93\x4a\xe0\x42\x01\x5f\xa7\x29\x46\x81\x33\x95\x7e\xf3\xc9\xbe\xae\x5a\x53\xb5\xaa\x3e\xd9\x7d\xa7\x3d\x26\x4f\xe7\x9d\x3b\xef\x5e\x70\xc5\xf5\x58\x17\x8f\x7a\x2e\xa6\x71\xb6\xae\x66\x16\x90\xd0\x0f\xb2\x4e\x15\x7c\xc3\x28\x88\x05\xcf\x94\x24\x8c\xab\xa1\x29\x56\x69\xf3\x0f\x3e\x25\x5b\x11\xb9\x85\xdf\x74\x0b\x0b\xab\x31\x8c\x8c\x43\x14\xc2\x86\xa9\x5f\x62\xad\x40\x8a\x0d\x4b\xbe\xe2\xe9\x53\x1b\xe2\xf5\x9b\x15\x7c\xa0\x16\x1e\x15\x46\xcf\x15\xe6\x99\x8e\x34\x66\xa4\xaf\x62\xf3\xd2\x1d\xf2\xef\xe3\x7f\x26\xe1\x92\x2c\x9e\x34\xd6\x03\x7e\xd5\x1d\x68\xf5\xed\xf9\xdc\xe0\xc7\x4b\x83\x84\xfa\x0e\x3d\xe2\x5b\xf4\xa8\xd5\xdd\x4b\x18\x21\xc4\x78\x46\xa5\xba\x8d\xee\xb1\xac\xdd\x14\x66\x2d\xcf\xb4\x30\x8c\x2b\x98\x98\x63\xb8\x6f\x08\x86\xfb\x6e\x84\x28\xa3\x29\x8d\x15\xdc\xb4\x5d\x5b\x37\x46\x93\x16\x43\x3b\x17\x13\xef\xc5\xd0\x87\x14\xab\xe9\x69\xf5\xa7\x34\xb9\x99\x9e\xf8\x1f\x00\x00\xff\xff\xa4\x05\xfd\x0a\x9f\x03\x00\x00") + +func _5_nwaku_schemaDownSqlBytes() ([]byte, error) { + return bindataRead( + __5_nwaku_schemaDownSql, + "5_nwaku_schema.down.sql", + ) +} + +func _5_nwaku_schemaDownSql() (*asset, error) { + bytes, err := _5_nwaku_schemaDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "5_nwaku_schema.down.sql", size: 927, mode: os.FileMode(0664), modTime: time.Unix(1702924493, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0x6e, 0xcc, 0x65, 0x3c, 0x21, 0x2a, 0x77, 0x18, 0x7b, 0xe9, 0x5d, 0x97, 0x8e, 0x87, 0xc8, 0xa6, 0x3b, 0x1a, 0x4d, 0xe6, 0xb1, 0x26, 0x95, 0xdb, 0x84, 0xcc, 0xbb, 0x4d, 0x76, 0x70, 0xc7}} + return a, nil +} + +var __5_nwaku_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x53\xc1\x8e\xda\x30\x10\xbd\xfb\x2b\xde\x91\x48\xd9\x43\x7b\xcd\x29\x80\xdb\x8d\x1a\xec\x95\xf1\x6a\xbb\x27\x64\xf0\x68\xb1\x14\x92\x34\x76\xa0\xfc\x7d\x45\xa9\x81\xb0\xe1\xd0\x43\x0e\x99\xf7\xf2\x66\xe6\xcd\x4b\x5e\x6a\xae\xa0\xf3\x69\xc9\xb1\x23\xef\xcd\x07\x41\x71\x91\x2f\x38\xb4\x8c\x95\x55\x53\xd9\x8c\xb1\xb9\x92\x2f\x28\xc4\x9c\xff\xbc\x00\x9e\x6a\x4b\x9d\x76\x3b\xf2\xc1\xec\xda\x6c\x8c\xd3\xd1\x86\xdc\xfe\x11\xcb\xad\x76\xfe\x63\xf5\x65\xa4\xf6\x35\x63\x6c\xa6\x78\xae\xf9\xdd\x7c\x13\x06\xb4\xfd\xda\xf7\x6b\xdd\xb4\x6e\x83\x69\x29\xa7\x10\x52\x43\xbc\x96\x65\xca\x80\x4d\x53\x07\xaa\xc3\x03\xb4\x35\xc7\xaa\x31\xf6\x2f\x70\x7a\xdf\x53\xe7\x5d\x53\xa3\x10\x9a\x7f\xe7\x6a\xc0\x0d\x71\xe6\x51\xd4\x5d\x45\xfe\xcd\xf6\x6c\xfc\xf6\x5c\xc3\xd3\x13\x04\x1d\xaa\x23\x8c\xb5\x64\x53\x84\xad\xf3\x38\xb8\xaa\xc2\x9a\xd0\x36\x6d\x5f\x99\x40\x16\x07\x17\xb6\x30\xd8\x34\x7d\x1d\xa8\xc3\xde\x54\x3d\x31\xc0\x87\xa6\x23\x9b\x87\xd1\xbe\x33\x29\x96\x5a\xe5\x85\xd0\xb1\x6f\x51\x5b\xfa\x8d\x17\x55\x2c\x72\xf5\x8e\x1f\xfc\x1d\x93\x9b\x89\x12\x96\xe0\xad\xd0\xcf\xf2\x55\x43\xc9\xb7\x62\x9e\x45\x63\xa3\xdf\xc1\x43\x0a\x2c\xa2\xc1\xb1\x7b\xf2\x89\xf8\xab\xa7\xee\x38\xe0\xde\x9a\x9d\xde\xde\x25\xbd\x2c\x91\xc2\xd9\x24\x63\xac\x10\x4b\xae\xf4\x69\xa5\x4b\xb4\x26\x83\x0f\xee\xb4\xce\x87\x4a\xe3\x85\xd2\xeb\x39\x4e\x8a\xe9\xad\xe9\xd7\x66\x09\x5b\xf2\x92\xcf\x34\xfe\x4b\xf9\x2e\xc8\x67\xfd\xd3\xf3\x29\xbd\xec\x9b\x92\x8b\xb1\x3f\x63\x10\xd2\x33\xf0\x27\x00\x00\xff\xff\xe5\x34\x0a\x93\x5e\x03\x00\x00") + +func _5_nwaku_schemaUpSqlBytes() ([]byte, error) { + return bindataRead( + __5_nwaku_schemaUpSql, + "5_nwaku_schema.up.sql", + ) +} + +func _5_nwaku_schemaUpSql() (*asset, error) { + bytes, err := _5_nwaku_schemaUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "5_nwaku_schema.up.sql", size: 862, mode: os.FileMode(0664), modTime: time.Unix(1702926452, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xc9, 0x53, 0x70, 0xe8, 0x7d, 0xc5, 0x25, 0x14, 0xa4, 0x78, 0x9c, 0x89, 0xe1, 0x57, 0xdc, 0xef, 0x35, 0xa6, 0xab, 0xfb, 0x6d, 0x19, 0x28, 0xd7, 0xd0, 0xe5, 0xd8, 0x7e, 0x1, 0xbf, 0x9e}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -364,6 +406,10 @@ var _bindata = map[string]func() (*asset, error){ "4_signed_peer_record.up.sql": _4_signed_peer_recordUpSql, + "5_nwaku_schema.down.sql": _5_nwaku_schemaDownSql, + + "5_nwaku_schema.up.sql": _5_nwaku_schemaUpSql, + "doc.go": docGo, } @@ -416,6 +462,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "3_rendezvous.up.sql": &bintree{_3_rendezvousUpSql, map[string]*bintree{}}, "4_signed_peer_record.down.sql": &bintree{_4_signed_peer_recordDownSql, map[string]*bintree{}}, "4_signed_peer_record.up.sql": &bintree{_4_signed_peer_recordUpSql, map[string]*bintree{}}, + "5_nwaku_schema.down.sql": &bintree{_5_nwaku_schemaDownSql, map[string]*bintree{}}, + "5_nwaku_schema.up.sql": &bintree{_5_nwaku_schemaUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.down.sql b/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.down.sql new file mode 100644 index 00000000..e1933087 --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.down.sql @@ -0,0 +1,25 @@ +ALTER TABLE message RENAME TO message_old; +DROP INDEX i_ts; +DROP INDEX i_query; + +CREATE TABLE message ( + id BLOB, + receiverTimestamp INTEGER NOT NULL, + senderTimestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL DEFAULT 0, + CONSTRAINT messageIndex PRIMARY KEY (id, pubsubTopic) +) WITHOUT ROWID; + +CREATE INDEX message_senderTimestamp ON message(senderTimestamp); +CREATE INDEX message_receiverTimestamp ON message(receiverTimestamp); +CREATE INDEX i_msg_1 ON message(contentTopic ASC, pubsubTopic ASC, senderTimestamp ASC, id ASC); +CREATE INDEX i_msg_2 ON message(contentTopic DESC, pubsubTopic DESC, senderTimestamp DESC, id DESC); + +INSERT INTO message(id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) +SELECT id, storedAt, timestamp, contentTOpic, pubsubTopic, payload, version +FROM message_old; + +DROP TABLE message_old; diff --git a/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.up.sql b/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.up.sql new file mode 100644 index 00000000..c2ad9dab --- /dev/null +++ b/waku/persistence/sqlite/migrations/sql/5_nwaku_schema.up.sql @@ -0,0 +1,26 @@ +ALTER TABLE message RENAME TO message_old; + +DROP INDEX message_senderTimestamp; +DROP INDEX message_receiverTimestamp; +DROP INDEX i_msg_1; +DROP INDEX i_msg_2; + +CREATE TABLE message ( + pubsubTopic BLOB NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + id BLOB, + messageHash BLOB, -- Newly added, this will be populated with a counter value + storedAt INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (messageHash) +) WITHOUT ROWID; +CREATE INDEX i_ts ON Message (storedAt); +CREATE INDEX i_query ON Message (contentTopic, pubsubTopic, storedAt, id); + +INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) +SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, id, receiverTimestamp +FROM message_old; + +DROP TABLE message_old; diff --git a/waku/persistence/sqlite/nwaku.go b/waku/persistence/sqlite/nwaku.go new file mode 100644 index 00000000..47dcea36 --- /dev/null +++ b/waku/persistence/sqlite/nwaku.go @@ -0,0 +1,82 @@ +package sqlite + +import ( + "database/sql" + "errors" + "fmt" +) + +const minSupportedNWAKUversion = 8 +const maxSupportedNWAKUversion = 8 + +func handleNWakuPreMigration(db *sql.DB) (bool, error) { + // Check if there's an user version in the DB, and if migration table does not exist. + // Rename existing table, and move data afterwards + + var nwakuDBVersion int + err := db.QueryRow("PRAGMA user_version").Scan(&nwakuDBVersion) + if err != nil { + return false, fmt.Errorf("could not obtain sqlite user_version while attempting to migrate nwaku database: %w", err) + } + + var gowakuDBVersion int + err = db.QueryRow(`SELECT COUNT(*) FROM schema_migrations`).Scan(&gowakuDBVersion) + if err != nil && errors.Is(err, sql.ErrNoRows) { + return false, fmt.Errorf("could not obtain schema_migrations data while attempting to migrate nwaku database: %w", err) + } + + if nwakuDBVersion == 0 { + // not a nwaku db + return false, nil + } + + if nwakuDBVersion < minSupportedNWAKUversion || nwakuDBVersion > maxSupportedNWAKUversion { + err = fmt.Errorf("unsupported nwaku DB %d - Supported versions [%d,%d]", nwakuDBVersion, minSupportedNWAKUversion, maxSupportedNWAKUversion) + return false, err + } + + if gowakuDBVersion > 0 { + // We have already migrated this database + return false, nil + } + + _, err = db.Exec("ALTER TABLE message RENAME TO message_nwaku") + if err != nil { + return false, fmt.Errorf("could not rename nwaku message table while attempting to migrate nwaku database: %w", err) + } + + _, err = db.Exec("DROP INDEX i_ts;") + if err != nil { + return false, fmt.Errorf("could not drop indexes while attempting to migrate nwaku database: %w", err) + } + + _, err = db.Exec("DROP INDEX i_query;") + if err != nil { + return false, fmt.Errorf("could not drop indexes while attempting to migrate nwaku database: %w", err) + } + + return true, nil +} + +func handleNWakuPostMigration(db *sql.DB) error { + _, err := db.Exec("INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt FROM message_nwaku") + if err != nil { + return fmt.Errorf("could not migrate nwaku messages: %w", err) + } + + _, err = db.Exec("DROP TABLE message_nwaku") + if err != nil { + return fmt.Errorf("could not drop nwaku message table: %w", err) + } + + return nil +} + +func revertNWakuPreMigration(db *sql.DB) error { + _, err := db.Exec("ALTER TABLE message_nwaku RENAME TO message") + if err != nil { + return fmt.Errorf("could not revert changes to nwaku db: %w", err) + } + + return nil +} diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index b902294b..bb94215a 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -45,17 +45,43 @@ func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) { func migrationDriver(db *sql.DB) (database.Driver, error) { return sqlite3.WithInstance(db, &sqlite3.Config{ - MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, + MigrationsTable: sqlite3.DefaultMigrationsTable, }) } // Migrations is the function used for DB migration with sqlite driver -func Migrations(db *sql.DB) error { +func Migrations(db *sql.DB, logger *zap.Logger) error { migrationDriver, err := migrationDriver(db) if err != nil { return err } - return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset) + + isNwaku, err := handleNWakuPreMigration(db) + if err != nil { + return err + } + + err = migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset) + if err != nil { + if isNwaku { + migrationErr := migrate.MigrateDown(db, migrationDriver, migrations.AssetNames(), migrations.Asset) + if migrationErr != nil { + logger.Fatal("could not revert table changes in nwaku database", zap.Error(err)) + } + revertErr := revertNWakuPreMigration(db) + if revertErr != nil { + logger.Fatal("could not revert table changes in nwaku database", zap.Error(err)) + } + } + + return err + } + + if isNwaku { + return handleNWakuPostMigration(db) + } + + return nil } // CreateTable creates the table that will persist the peers diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 8992b2bd..574d1877 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -49,7 +49,7 @@ type DBStore struct { MessageProvider db *sql.DB - migrationFn func(db *sql.DB) error + migrationFn func(db *sql.DB, logger *zap.Logger) error metrics Metrics timesource timesource.Timesource @@ -121,7 +121,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { } } -type MigrationFn func(db *sql.DB) error +type MigrationFn func(db *sql.DB, logger *zap.Logger) error // WithMigrations is a DBOption used to determine if migrations should // be executed, and what driver to use @@ -157,7 +157,7 @@ func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption) } if result.enableMigrations { - err := result.migrationFn(result.db) + err := result.migrationFn(result.db, log) if err != nil { return nil, err } @@ -211,7 +211,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { // Delete older messages if d.maxDuration > 0 { start := time.Now() - sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1` + sqlStmt := `DELETE FROM message WHERE storedAt < $1` _, err := d.db.Exec(sqlStmt, d.timesource.Now().Add(-d.maxDuration).UnixNano()) if err != nil { d.metrics.RecordError(retPolicyFailure) @@ -240,7 +240,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { } func (d *DBStore) getDeleteOldRowsQuery() string { - sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC %s OFFSET $1)` + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY storedAt DESC %s OFFSET $1)` switch GetDriverType(d.db) { case SQLiteDriver: sqlStmt = fmt.Sprintf(sqlStmt, "LIMIT -1") @@ -282,7 +282,12 @@ func (d *DBStore) Stop() { // Validate validates the message to be stored against possible fradulent conditions. func (d *DBStore) Validate(env *protocol.Envelope) error { - n := time.Unix(0, env.Index().ReceiverTime) + timestamp := env.Message().GetTimestamp() + if timestamp == 0 { + return nil + } + + n := time.Unix(0, timestamp) upperBound := n.Add(MaxTimeVariance) lowerBound := n.Add(-MaxTimeVariance) @@ -300,17 +305,20 @@ func (d *DBStore) Validate(env *protocol.Envelope) error { // Put inserts a WakuMessage into the DB func (d *DBStore) Put(env *protocol.Envelope) error { - stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)") + + stmt, err := d.db.Prepare("INSERT INTO message (id, messageHash, storedAt, timestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") if err != nil { d.metrics.RecordError(insertFailure) return err } - cursor := env.Index() - dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest) + storedAt := env.Message().GetTimestamp() + if storedAt == 0 { + storedAt = env.Index().ReceiverTime + } start := time.Now() - _, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) + _, err = stmt.Exec(env.Index().Digest, env.Hash(), storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) if err != nil { return err } @@ -329,36 +337,33 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi usesCursor := false if query.PagingInfo.Cursor != nil { usesCursor = true + var exists bool - cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) - - err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)", - cursorDBKey.Bytes(), + err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE storedAt = $1 AND id = $2)", + query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest, ).Scan(&exists) - if err != nil { return nil, nil, err } - if exists { - eqOp := ">" - if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { - eqOp = "<" - } - *paramCnt++ - conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt)) - - parameters = append(parameters, cursorDBKey.Bytes()) - } else { + if !exists { return nil, nil, ErrInvalidCursor } + + eqOp := ">" + if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { + eqOp = "<" + } + conditions = append(conditions, fmt.Sprintf("(storedAt, id) %s ($%d, $%d)", eqOp, *paramCnt+1, *paramCnt+2)) + *paramCnt += 2 + + parameters = append(parameters, query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest) } handleTimeParam := func(time int64, op string) { *paramCnt++ - conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt)) - timeDBKey := NewDBKey(uint64(time), 0, "", []byte{}) - parameters = append(parameters, timeDBKey.Bytes()) + conditions = append(conditions, fmt.Sprintf("storedAt %s $%d", op, *paramCnt)) + parameters = append(parameters, time) } startTime := query.GetStartTime() @@ -378,10 +383,10 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi } func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) { - sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version + sqlQuery := `SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message %s - ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s ` + ORDER BY timestamp %s, id %s, pubsubTopic %s, storedAt %s ` var conditions []string //var parameters []interface{} @@ -428,7 +433,7 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{} parameters = append(parameters, pageSize) sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection) - d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery)) + d.log.Debug(fmt.Sprintf("sqlQuery: %s", sqlQuery)) return sqlQuery, parameters, nil @@ -490,12 +495,12 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err return cursor, result, nil } -// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp +// MostRecentTimestamp returns an unix timestamp with the most recent timestamp // in the message table func (d *DBStore) MostRecentTimestamp() (int64, error) { result := sql.NullInt64{} - err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM message`).Scan(&result) + err := d.db.QueryRow(`SELECT max(timestamp) FROM message`).Scan(&result) if err != nil && err != sql.ErrNoRows { return 0, err } @@ -520,7 +525,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { d.log.Info("loading records from the DB", zap.Duration("duration", elapsed)) }() - rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") + rows, err := d.db.Query("SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY timestamp ASC") if err != nil { return nil, err } @@ -550,14 +555,14 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { // GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage` func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { var id []byte - var receiverTimestamp int64 - var senderTimestamp int64 + var storedAt int64 + var timestamp int64 var contentTopic string var payload []byte var version uint32 var pubsubTopic string - err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version) + err := row.Scan(&id, &storedAt, ×tamp, &contentTopic, &pubsubTopic, &payload, &version) if err != nil { d.log.Error("scanning messages from db", zap.Error(err)) return StoredMessage{}, err @@ -567,8 +572,8 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { msg.ContentTopic = contentTopic msg.Payload = payload - if senderTimestamp != 0 { - msg.Timestamp = proto.Int64(senderTimestamp) + if timestamp != 0 { + msg.Timestamp = proto.Int64(timestamp) } if version > 0 { @@ -578,7 +583,7 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { record := StoredMessage{ ID: id, PubsubTopic: pubsubTopic, - ReceiverTime: receiverTimestamp, + ReceiverTime: storedAt, Message: msg, } diff --git a/waku/persistence/utils/db.go b/waku/persistence/utils/db.go index 57a3c0e6..14d777ea 100644 --- a/waku/persistence/utils/db.go +++ b/waku/persistence/utils/db.go @@ -26,9 +26,9 @@ type DBSettings struct { } // ParseURL will return a database connection, and migration function that should be used depending on a database connection string -func ParseURL(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { +func ParseURL(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB, *zap.Logger) error, error) { var db *sql.DB - var migrationFn func(*sql.DB) error + var migrationFn func(*sql.DB, *zap.Logger) error var err error logger = logger.Named("db-setup") diff --git a/waku/persistence/utils/store_test.go b/waku/persistence/utils/store_test.go index d8896b81..fee14f65 100644 --- a/waku/persistence/utils/store_test.go +++ b/waku/persistence/utils/store_test.go @@ -20,13 +20,14 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" "google.golang.org/protobuf/proto" ) func TestStore(t *testing.T) { tests := []struct { name string - fn func(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) + fn func(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) }{ {"testDbStore", testDbStore}, {"testStoreRetention", testStoreRetention}, @@ -43,7 +44,7 @@ func TestStore(t *testing.T) { } } -func getDB(driver string) (*sql.DB, func(*sql.DB) error) { +func getDB(driver string) (*sql.DB, func(*sql.DB, *zap.Logger) error) { switch driver { case "postgres": return postgres.NewMockPgDB(), postgres.Migrations @@ -52,7 +53,7 @@ func getDB(driver string) (*sql.DB, func(*sql.DB) error) { } return nil, nil } -func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { +func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) { store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn)) require.NoError(t, err) @@ -71,7 +72,7 @@ func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { require.NotEmpty(t, res) } -func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { +func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) { store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) @@ -112,7 +113,7 @@ func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) erro require.Equal(t, msgCount, 3) } -func testQuery(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { +func testQuery(t *testing.T, db *sql.DB, migrationFn func(*sql.DB, *zap.Logger) error) { store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index d354e3c7..f1709f0c 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -48,7 +48,7 @@ func TestRendezvous(t *testing.T) { db, err := sqlite.NewDB(":memory:", utils.Logger()) require.NoError(t, err) - err = sqlite.Migrations(db) + err = sqlite.Migrations(db, utils.Logger()) require.NoError(t, err) rdb := NewDB(db, utils.Logger())