diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 80822cb77..8d1f6008c 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -19,13 +19,19 @@ const DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc ts(offset=0, origin=now()): Timestamp = + origin + getNanosecondTime(offset) + proc newTestDatabase(): SqliteDatabase = SqliteDatabase.init("", inMemory = true).tryGet() proc fakeWakuMessage( payload = "TEST-PAYLOAD", contentTopic = DefaultContentTopic, - ts = getNanosecondTime(epochTime()) + ts = now() ): WakuMessage = WakuMessage( payload: toBytes(payload), @@ -46,20 +52,20 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -94,20 +100,20 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -144,20 +150,20 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=ts(3)), - fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=ts(7)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -193,24 +199,24 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages1 = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), ] for msg in messages1: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let messages2 = @[ - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages2: - require store.put(pubsubTopic, msg).isOk() + require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -246,20 +252,20 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) @@ -297,20 +303,20 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) @@ -349,22 +355,22 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages1 = @[ - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), - fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), - fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), - fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage(ts=ts(0)), + fakeWakuMessage(ts=ts(1)), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)), ] for msg in messages1: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let messages2 = @[ - fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), - fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)), ] for msg in messages2: - require store.put(pubsubTopic, msg).isOk() + require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) @@ -403,15 +409,15 @@ suite "message store - history query": store = SqliteStore.init(database).tryGet() let messages = @[ - fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2), - fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 3), - fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 4), - fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 5), - fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=ts(2)), + fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=ts(3)), + fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=ts(4)), + fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=ts(5)), + fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=ts(6)), ] for msg in messages: - require store.put(DefaultPubsubTopic, msg).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -431,6 +437,68 @@ suite "message store - history query": ## Teardown store.close() + + test "content topic and page size": + ## Given + let pageSize: uint64 = 50 + let + database = newTestDatabase() + store = SqliteStore.init(database).tryGet() + + for t in 0..<70: + let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t)) + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[DefaultContentTopic]), + maxPageSize=pageSize, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 50 + pagingInfo.isSome() + pagingInfo.get().pageSize == 50 + + ## Teardown + store.close() + + test "content topic and page size - not enough messages stored": + ## Given + let pageSize: uint64 = 50 + let + database = newTestDatabase() + store = SqliteStore.init(database).tryGet() + + for t in 0..<40: + let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t)) + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[DefaultContentTopic]), + maxPageSize=pageSize, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 40 + pagingInfo.isSome() + pagingInfo.get().pageSize == 40 + + ## Teardown + store.close() test "single content topic and valid time range": ## Given diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index afd5ba7b7..1414857eb 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -42,7 +42,7 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = MaxPageSize, + maxPageSize = DefaultPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] {.base.} = discard diff --git a/waku/v2/node/storage/message/sqlite_store/queries.nim b/waku/v2/node/storage/message/sqlite_store/queries.nim index 0c5d475d1..f313bfbd8 100644 --- a/waku/v2/node/storage/message/sqlite_store/queries.nim +++ b/waku/v2/node/storage/message/sqlite_store/queries.nim @@ -37,9 +37,9 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo timestamp: Timestamp(senderTimestamp) ) -proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp = - let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol) - Timestamp(receiverTimestamp) +proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp = + let storedAt = sqlite3_column_int64(s, storedAtCol) + Timestamp(storedAt) proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string = let @@ -58,7 +58,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str proc createTableQuery(table: string): SqlQueryStr = "CREATE TABLE IF NOT EXISTS " & table & " (" & " id BLOB," & - " receiverTimestamp INTEGER NOT NULL," & + " storedAt INTEGER NOT NULL," & " contentTopic BLOB NOT NULL," & " pubsubTopic BLOB NOT NULL," & " payload BLOB," & @@ -76,7 +76,7 @@ proc createTable*(db: SqliteDatabase): DatabaseResult[void] = ## Create indices proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr = - "CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);" + "CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);" proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createOldestMessageTimestampIndexQuery(DbTable) @@ -85,7 +85,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = - "CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);" + "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);" proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createHistoryQueryIndexQuery(DbTable) @@ -97,7 +97,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" & + "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" & " VALUES (?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = @@ -126,7 +126,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = ## Get oldest message receiver timestamp proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = - "SELECT MIN(receiverTimestamp) FROM " & table + "SELECT MIN(storedAt) FROM " & table proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= var timestamp: Timestamp @@ -143,7 +143,7 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam ## Get newest message receiver timestamp proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = - "SELECT MAX(receiverTimestamp) FROM " & table + "SELECT MAX(storedAt) FROM " & table proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= var timestamp: Timestamp @@ -160,7 +160,7 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam ## Delete messages older than timestamp proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = - "DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts + "DELETE FROM " & table & " WHERE storedAt < " & $ts proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] = let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) @@ -173,7 +173,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = "DELETE FROM " & table & " WHERE id NOT IN (" & " SELECT id FROM " & table & - " ORDER BY receiverTimestamp DESC" & + " ORDER BY storedAt DESC" & " LIMIT " & $limit & ");" @@ -187,20 +187,20 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" & " FROM " & table & - " ORDER BY receiverTimestamp ASC" + " ORDER BY storedAt ASC" proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] = ## Retrieve all messages from the store. var rows: seq[(Timestamp, WakuMessage, string)] proc queryRowCallback(s: ptr sqlite3_stmt) = let - receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) - rows.add((receiverTimestamp, wakuMessage, pubsubTopic)) + rows.add((storedAt, wakuMessage, pubsubTopic)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) @@ -232,7 +232,7 @@ proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] = return none(string) let comp = if ascending: ">" else: "<" - let whereClause = "(senderTimestamp, id, pubsubTopic) " & comp & " (?, ?, ?)" + let whereClause = "(storedAt, id, pubsubTopic) " & comp & " (?, ?, ?)" some(whereClause) proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] = @@ -247,11 +247,11 @@ proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestam var where = "(" if startTime.isSome(): - where &= "senderTimestamp >= (?)" + where &= "storedAt >= (?)" if startTime.isSome() and endTime.isSome(): where &= " AND " if endTime.isSome(): - where &= "senderTimestamp <= (?)" + where &= "storedAt <= (?)" where &= ")" some(where) @@ -273,13 +273,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" query &= " FROM " & table if where.isSome(): query &= " WHERE " & where.get() - query &= " ORDER BY senderTimestamp " & order & ", id " & order & ", pubsubTopic " & order & ", receiverTimestamp " & order + query &= " ORDER BY storedAt " & order query &= " LIMIT " & $limit & ";" query @@ -363,11 +363,11 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, var messages: seq[(WakuMessage, Timestamp, string)] = @[] proc queryRowCallback(s: ptr sqlite3_stmt) = let - receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) - messages.add((message, receiverTimestamp, pubsubTopic)) + messages.add((message, storedAt, pubsubTopic)) let query = block: let diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 4376946e1..5a5d83929 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -66,12 +66,12 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M let res = s.insertStmt.exec(( @(digest.data), # id - receivedTime, # receiverTimestamp - toBytes(message.contentTopic), # contentTopic + receivedTime, # storedAt + toBytes(message.contentTopic), # contentTopic message.payload, # payload - toBytes(pubsubTopic), # pubsubTopic + toBytes(pubsubTopic), # pubsubTopic int64(message.version), # version - message.timestamp # senderTimestamp + message.timestamp # senderTimestamp )) if res.isErr(): return err("message insert failed: " & res.error()) @@ -95,11 +95,9 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = MaxPageSize, + maxPageSize = DefaultPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = - let pageSizeLimit = if maxPageSize <= 0: MaxPageSize - else: min(maxPageSize, MaxPageSize) let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, @@ -107,7 +105,7 @@ method getMessagesByHistoryQuery*( cursor, startTime, endTime, - limit=pageSizeLimit, + limit=maxPageSize, ascending=ascendingOrder ) @@ -118,8 +116,8 @@ method getMessagesByHistoryQuery*( # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message # Compute last message index - let (message, receivedTimestamp, pubsubTopic) = rows[^1] - let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic) + let (message, storedAt, pubsubTopic) = rows[^1] + let lastIndex = Index.compute(message, storedAt, pubsubTopic) let pagingInfo = PagingInfo( pageSize: uint64(messages.len), diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 824379fa6..043086c45 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -357,8 +357,7 @@ proc getPage*(storeQueue: StoreQueueRef, let cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! else: some(pagingInfo.cursor) - maxPageSize = if pagingInfo.pageSize <= 0: MaxPageSize - else: min(pagingInfo.pageSize, MaxPageSize) + maxPageSize = pagingInfo.pageSize case pagingInfo.direction of PagingDirection.FORWARD: @@ -384,7 +383,7 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = MaxPageSize, + maxPageSize = DefaultPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = diff --git a/waku/v2/node/storage/migration/migration_types.nim b/waku/v2/node/storage/migration/migration_types.nim index c2dd304c6..c006e19ac 100644 --- a/waku/v2/node/storage/migration/migration_types.nim +++ b/waku/v2/node/storage/migration/migration_types.nim @@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message" const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" -const USER_VERSION* = 5 # increase this when there is an update in the database schema +const USER_VERSION* = 6 # increase this when there is an update in the database schema type MigrationScriptsResult*[T] = Result[T, string] type diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00006_renameColumn.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00006_renameColumn.up.sql new file mode 100644 index 000000000..360218dcc --- /dev/null +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00006_renameColumn.up.sql @@ -0,0 +1,11 @@ +ALTER TABLE message RENAME COLUMN receiverTimestamp TO storedAt; + + +DROP INDEX IF EXISTS i_msg; + +CREATE INDEX IF NOT EXISTS i_query ON message (contentTopic, pubsubTopic, storedAt, id); + + +DROP INDEX IF EXISTS i_rt; + +CREATE INDEX IF NOT EXISTS i_ts ON message (storedAt); diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 1efb3ac0c..3ca2c47eb 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -109,7 +109,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} else: none(Timestamp) qEndTime = if query.endTime != Timestamp(0): some(query.endTime) else: none(Timestamp) - qMaxPageSize = query.pagingInfo.pageSize + qMaxPageSize = if query.pagingInfo.pageSize <= 0: DefaultPageSize + else: min(query.pagingInfo.pageSize, MaxPageSize) qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD