diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index 1f4be9ba8..33f58f40f 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -28,8 +28,8 @@ proc newTestDatabase(): SqliteDatabase = proc now(): Timestamp = getNanosecondTime(getTime().toUnixFloat()) -proc getTestTimestamp(offset=0): Timestamp = - Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float)) +proc ts(offset=0, origin=now()): Timestamp = + origin + getNanosecondTime(offset) proc fakeWakuMessage( payload = "TEST-PAYLOAD", @@ -113,19 +113,19 @@ suite "SQLite message store - insert messages": retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), - fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6) + fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(6)) ] ## When for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require retentionPolicy.execute(store).isOk() ## Then @@ -150,8 +150,8 @@ suite "Message Store": store = SqliteStore.init(database).get() let - t1 = getTestTimestamp(0) - t2 = getTestTimestamp(1) + t1 = ts(0) + t2 = ts(1) t3 = high(int64) var msgs = @[ @@ -265,21 +265,18 @@ suite "Message Store": ver.isErr == false ver.value == 10 + # TODO: Move this test case to retention policy test suite test "number of messages retrieved by getAll is bounded by storeCapacity": - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - capacity = 10 + let capacity = 10 let database = newTestDatabase() store = SqliteStore.init(database).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) - for i in 1..capacity: - let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - require store.put(pubsubTopic, msg).isOk() + let msg = WakuMessage(payload: @[byte i], contentTopic: DefaultContentTopic, version: 0, timestamp: Timestamp(i)) + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require retentionPolicy.execute(store).isOk() ## Then @@ -297,6 +294,7 @@ suite "Message Store": ## Cleanup store.close() + # TODO: Move this test case to retention policy test suite test "DB store capacity": let contentTopic = ContentTopic("/waku/2/default-content/proto") diff --git a/waku/v2/node/storage/message/sqlite_store/queries.nim b/waku/v2/node/storage/message/sqlite_store/queries.nim index 317def711..31aa444dd 100644 --- a/waku/v2/node/storage/message/sqlite_store/queries.nim +++ b/waku/v2/node/storage/message/sqlite_store/queries.nim @@ -7,7 +7,6 @@ import import ../../sqlite, ../../../../protocol/waku_message, - ../../../../protocol/waku_store/pagination, ../../../../utils/time @@ -15,6 +14,8 @@ const DbTable = "Message" type SqlQueryStr = string +type DbCursor* = (Timestamp, seq[byte], string) + ### SQLite column helper methods @@ -57,14 +58,14 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str proc createTableQuery(table: string): SqlQueryStr = "CREATE TABLE IF NOT EXISTS " & table & " (" & - " id BLOB," & - " storedAt INTEGER NOT NULL," & - " contentTopic BLOB NOT NULL," & " pubsubTopic BLOB NOT NULL," & + " contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," & - " senderTimestamp INTEGER NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" & + " timestamp INTEGER NOT NULL," & + " id BLOB," & + " storedAt INTEGER NOT NULL," & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = @@ -97,7 +98,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" & + "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & " VALUES (?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = @@ -187,7 +188,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" & " FROM " & table & " ORDER BY storedAt ASC" @@ -227,7 +228,7 @@ proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[st contentTopicWhere &= ")" some(contentTopicWhere) -proc cursorWhereClause(cursor: Option[PagingIndex], ascending=true): Option[string] = +proc cursorWhereClause(cursor: Option[DbCursor], ascending=true): Option[string] = if cursor.isNone(): return none(string) @@ -273,13 +274,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" query &= " FROM " & table if where.isSome(): query &= " WHERE " & where.get() - query &= " ORDER BY storedAt " & order + query &= " ORDER BY storedAt " & order & ", id " & order & ", pubsubTopic " & order query &= " LIMIT " & $limit & ";" query @@ -292,7 +293,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab proc execSelectMessagesWithLimitStmt(s: SqliteStmt, contentTopic: Option[seq[ContentTopic]], pubsubTopic: Option[string], - cursor: Option[PagingIndex], + cursor: Option[DbCursor], startTime: Option[Timestamp], endTime: Option[Timestamp], onRowCallback: DataProc): DatabaseResult[void] = @@ -302,21 +303,16 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt, var paramIndex = 1 if contentTopic.isSome(): for topic in contentTopic.get(): - let topicBlob = toBytes(topic) - checkErr bindParam(s, paramIndex, topicBlob) + checkErr bindParam(s, paramIndex, topic.toBytes()) paramIndex += 1 - if cursor.isSome(): # cursor = senderTimestamp, id, pubsubTopic - let senderTimestamp = cursor.get().senderTime - checkErr bindParam(s, paramIndex, senderTimestamp) + if cursor.isSome(): # cursor = storedAt, id, pubsubTopic + let (storedAt, id, pubsubTopic) = cursor.get() + checkErr bindParam(s, paramIndex, storedAt) paramIndex += 1 - - let id = @(cursor.get().digest.data) checkErr bindParam(s, paramIndex, id) paramIndex += 1 - - let pubsubTopic = toBytes(cursor.get().pubsubTopic) - checkErr bindParam(s, paramIndex, pubsubTopic) + checkErr bindParam(s, paramIndex, pubsubTopic.toBytes()) paramIndex += 1 if pubsubTopic.isSome(): @@ -353,7 +349,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt, proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, contentTopic: Option[seq[ContentTopic]], pubsubTopic: Option[string], - cursor: Option[PagingIndex], + cursor: Option[DbCursor], startTime: Option[Timestamp], endTime: Option[Timestamp], limit: uint64, diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 6d3817c4c..c14dcf8dc 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -98,6 +98,7 @@ method getMessagesByHistoryQuery*( maxPageSize = DefaultPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = + let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic)) let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, diff --git a/waku/v2/node/storage/migration/migration_types.nim b/waku/v2/node/storage/migration/migration_types.nim index c006e19ac..ec16d9428 100644 --- a/waku/v2/node/storage/migration/migration_types.nim +++ b/waku/v2/node/storage/migration/migration_types.nim @@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message" const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" -const USER_VERSION* = 6 # increase this when there is an update in the database schema +const USER_VERSION* = 7 # increase this when there is an update in the database schema type MigrationScriptsResult*[T] = Result[T, string] type diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00007_updatePrimaryKey.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00007_updatePrimaryKey.up.sql new file mode 100644 index 000000000..f1936c8d6 --- /dev/null +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00007_updatePrimaryKey.up.sql @@ -0,0 +1,18 @@ +ALTER TABLE message RENAME TO message_backup; + +CREATE TABLE IF NOT EXISTS message( + pubsubTopic BLOB NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + id BLOB, + storedAt INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic) +) WITHOUT ROWID; + +INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt) + SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, storedAt + FROM message_backup; + +DROP TABLE message_backup; \ No newline at end of file