From 95d31b3ed3760072efa2c3a467d9f9e81ed9610c Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 26 Jan 2023 10:19:58 +0100 Subject: [PATCH] fix(archive): reverse the db query results in the waku archive front-end --- .../waku_archive/test_driver_queue_query.nim | 16 +- .../waku_archive/test_driver_sqlite_query.nim | 16 +- tests/v2/waku_archive/test_waku_archive.nim | 178 +++++++----------- waku/v2/protocol/waku_archive/archive.nim | 42 +++-- .../driver/queue_driver/queue_driver.nim | 10 +- .../driver/sqlite_driver/sqlite_driver.nim | 8 +- 6 files changed, 113 insertions(+), 157 deletions(-) diff --git a/tests/v2/waku_archive/test_driver_queue_query.nim b/tests/v2/waku_archive/test_driver_queue_query.nim index 375b9617d..acf0b0477 100644 --- a/tests/v2/waku_archive/test_driver_queue_query.nim +++ b/tests/v2/waku_archive/test_driver_queue_query.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, random], + std/[options, sequtils, random, algorithm], testutils/unittests, chronos, chronicles @@ -160,7 +160,7 @@ suite "Queue driver - query by content topic": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[6..7] + filteredMessages == expected[6..7].reversed() ## Cleanup driver.close().expect("driver to close") @@ -510,7 +510,7 @@ suite "Queue driver - query by cursor": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..3] + filteredMessages == expected[2..3].reversed() ## Cleanup driver.close().expect("driver to close") @@ -600,7 +600,7 @@ suite "Queue driver - query by cursor": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..5] + filteredMessages == expected[2..5].reversed() ## Cleanup driver.close().expect("driver to close") @@ -706,7 +706,7 @@ suite "Queue driver - query by cursor": let expectedMessages = expected.mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expectedMessages[4..5] + filteredMessages == expectedMessages[4..5].reversed() ## Cleanup driver.close().expect("driver to close") @@ -980,7 +980,7 @@ suite "Queue driver - query by time range": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..6] + filteredMessages == expected[2..6].reversed() ## Cleanup driver.close().expect("driver to close") @@ -1078,7 +1078,7 @@ suite "Queue driver - query by time range": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[3..4] + filteredMessages == expected[3..4].reversed() ## Cleanup driver.close().expect("driver to close") @@ -1188,7 +1188,7 @@ suite "Queue driver - query by time range": let expectedMessages = expected.mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expectedMessages[4..5] + filteredMessages == expectedMessages[4..5].reversed() ## Cleanup driver.close().expect("driver to close") diff --git a/tests/v2/waku_archive/test_driver_sqlite_query.nim b/tests/v2/waku_archive/test_driver_sqlite_query.nim index cb2a00095..9ab6a233e 100644 --- a/tests/v2/waku_archive/test_driver_sqlite_query.nim +++ b/tests/v2/waku_archive/test_driver_sqlite_query.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, random], + std/[options, sequtils, random, algorithm], testutils/unittests, chronos, chronicles @@ -164,7 +164,7 @@ suite "SQLite driver - query by content topic": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[6..7] + filteredMessages == expected[6..7].reversed() ## Cleanup driver.close().expect("driver to close") @@ -514,7 +514,7 @@ suite "SQLite driver - query by cursor": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..3] + filteredMessages == expected[2..3].reversed() ## Cleanup driver.close().expect("driver to close") @@ -604,7 +604,7 @@ suite "SQLite driver - query by cursor": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..5] + filteredMessages == expected[2..5].reversed() ## Cleanup driver.close().expect("driver to close") @@ -710,7 +710,7 @@ suite "SQLite driver - query by cursor": let expectedMessages = expected.mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expectedMessages[4..5] + filteredMessages == expectedMessages[4..5].reversed() ## Cleanup driver.close().expect("driver to close") @@ -984,7 +984,7 @@ suite "SQLite driver - query by time range": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[2..6] + filteredMessages == expected[2..6].reversed() ## Cleanup driver.close().expect("driver to close") @@ -1082,7 +1082,7 @@ suite "SQLite driver - query by time range": let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expected[3..4] + filteredMessages == expected[3..4].reversed() ## Cleanup driver.close().expect("driver to close") @@ -1192,7 +1192,7 @@ suite "SQLite driver - query by time range": let expectedMessages = expected.mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1]) check: - filteredMessages == expectedMessages[4..5] + filteredMessages == expectedMessages[4..5].reversed() ## Cleanup driver.close().expect("driver to close") diff --git a/tests/v2/waku_archive/test_waku_archive.nim b/tests/v2/waku_archive/test_waku_archive.nim index de0520c36..5a76d0f72 100644 --- a/tests/v2/waku_archive/test_waku_archive.nim +++ b/tests/v2/waku_archive/test_waku_archive.nim @@ -12,8 +12,7 @@ import ../../../waku/v2/protocol/waku_archive/driver/sqlite_driver, ../../../waku/v2/protocol/waku_archive, ../../../waku/v2/utils/time, - ../testlib/common, - ../testlib/switch + ../testlib/common proc newTestDatabase(): SqliteDatabase = @@ -27,6 +26,15 @@ proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive = let validator: MessageValidator = DefaultMessageValidator() WakuArchive.new(driver, validator=some(validator)) +proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = + ArchiveCursor( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + storeTime: message.timestamp, + digest: computeDigest(message) + ) + + suite "Waku Archive - message handling": @@ -126,25 +134,25 @@ suite "Waku Archive - message handling": procSuite "Waku Archive - find messages": ## Fixtures let timeOrigin = now() + let msgListA = @[ + fakeWakuMessage(@[byte 00], contentTopic=ContentTopic("2"), ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], contentTopic=ContentTopic("1"), ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], contentTopic=ContentTopic("2"), ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], contentTopic=ContentTopic("1"), ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], contentTopic=ContentTopic("2"), ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], contentTopic=ContentTopic("1"), ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], contentTopic=ContentTopic("2"), ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], contentTopic=ContentTopic("1"), ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], contentTopic=ContentTopic("2"), ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], contentTopic=ContentTopic("1"), ts=ts(90, timeOrigin)) + ] + let archiveA = block: let driver = newTestArchiveDriver() archive = newTestWakuArchive(driver) - let msgList = @[ - fakeWakuMessage(@[byte 00], contentTopic=ContentTopic("2"), ts=ts(00, timeOrigin)), - fakeWakuMessage(@[byte 01], contentTopic=ContentTopic("1"), ts=ts(10, timeOrigin)), - fakeWakuMessage(@[byte 02], contentTopic=ContentTopic("2"), ts=ts(20, timeOrigin)), - fakeWakuMessage(@[byte 03], contentTopic=ContentTopic("1"), ts=ts(30, timeOrigin)), - fakeWakuMessage(@[byte 04], contentTopic=ContentTopic("2"), ts=ts(40, timeOrigin)), - fakeWakuMessage(@[byte 05], contentTopic=ContentTopic("1"), ts=ts(50, timeOrigin)), - fakeWakuMessage(@[byte 06], contentTopic=ContentTopic("2"), ts=ts(60, timeOrigin)), - fakeWakuMessage(@[byte 07], contentTopic=ContentTopic("1"), ts=ts(70, timeOrigin)), - fakeWakuMessage(@[byte 08], contentTopic=ContentTopic("2"), ts=ts(80, timeOrigin)), - fakeWakuMessage(@[byte 09], contentTopic=ContentTopic("1"), ts=ts(90, timeOrigin)) - ] - - for msg in msgList: + for msg in msgListA: require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() archive @@ -348,120 +356,76 @@ procSuite "Waku Archive - find messages": response.messages.anyIt(it == msg3) test "handle query with forward pagination": - ## Setup - let - driver = newTestArchiveDriver() - archive = newTestWakuArchive(driver) - - let currentTime = now() - let msgList = @[ - fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9), - fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8), - fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7), - fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6), - fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5), - fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4), - fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3), - fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2), - fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1), - fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime) - ] - - for msg in msgList: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - ## Given - var req = ArchiveQuery( - contentTopics: @[DefaultContentTopic], - pageSize: 2, + let req = ArchiveQuery( + pageSize: 4, ascending: true ) ## When - var res = archive.findMessages(req) - require res.isOk() + var nextReq = req # copy - var - response = res.tryGet() - totalMessages = response.messages.len() - totalQueries = 1 + var pages = newSeq[seq[WakuMessage]](3) + var cursors = newSeq[Option[ArchiveCursor]](3) - while response.cursor.isSome(): - require: - totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever - response.messages.len() == 2 - - req.cursor = response.cursor - - # Continue querying - res = archive.findMessages(req) + for i in 0..<3: + let res = archiveA.findMessages(nextReq) require res.isOk() - response = res.tryGet() - totalMessages += response.messages.len() - totalQueries += 1 + + # Keep query response content + let response = res.get() + pages[i] = response.messages + cursors[i] = response.cursor + + # Set/update the request cursor + nextReq.cursor = cursors[i] ## Then check: - totalQueries == 4 # 4 queries of pageSize 2 - totalMessages == 8 # 8 messages in total + cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[3])) + cursors[1] == some(computeTestCursor(DefaultPubsubTopic, msgListA[7])) + cursors[2] == none(ArchiveCursor) + + check: + pages[0] == msgListA[0..3] + pages[1] == msgListA[4..7] + pages[2] == msgListA[8..9] test "handle query with backward pagination": - ## Setup - let - driver = newTestArchiveDriver() - archive = newTestWakuArchive(driver) - - let currentTime = now() - let msgList = @[ - fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9), - fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8), - fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7), - fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6), - fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5), - fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4), - fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3), - fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2), - fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1), - fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime) - ] - - for msg in msgList: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - ## Given - var req = ArchiveQuery( - contentTopics: @[DefaultContentTopic], - pageSize: 2, - ascending: false + let req = ArchiveQuery( + pageSize: 4, + ascending: false # backward ) ## When - var res = archive.findMessages(req) - require res.isOk() + var nextReq = req # copy - var - response = res.tryGet() - totalMessages = response.messages.len() - totalQueries = 1 + var pages = newSeq[seq[WakuMessage]](3) + var cursors = newSeq[Option[ArchiveCursor]](3) - while response.cursor.isSome(): - require: - totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever - response.messages.len() == 2 - - req.cursor = response.cursor - - # Continue querying - res = archive.findMessages(req) + for i in 0..<3: + let res = archiveA.findMessages(nextReq) require res.isOk() - response = res.tryGet() - totalMessages += response.messages.len() - totalQueries += 1 + + # Keep query response content + let response = res.get() + pages[i] = response.messages + cursors[i] = response.cursor + + # Set/update the request cursor + nextReq.cursor = cursors[i] ## Then check: - totalQueries == 4 # 4 queries of pageSize 2 - totalMessages == 8 # 8 messages in total + cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[6])) + cursors[1] == some(computeTestCursor(DefaultPubsubTopic, msgListA[2])) + cursors[2] == none(ArchiveCursor) + + check: + pages[0] == msgListA[6..9] + pages[1] == msgListA[2..5] + pages[2] == msgListA[0..1] test "handle query with no paging info - auto-pagination": ## Setup diff --git a/waku/v2/protocol/waku_archive/archive.nim b/waku/v2/protocol/waku_archive/archive.nim index 1fc38e307..2d5f9e7c8 100644 --- a/waku/v2/protocol/waku_archive/archive.nim +++ b/waku/v2/protocol/waku_archive/archive.nim @@ -5,7 +5,7 @@ else: import - std/[tables, times, sequtils, options], + std/[tables, times, sequtils, options, algorithm], stew/results, chronicles, chronos, @@ -129,6 +129,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. if qContentTopics.len > 10: return err(ArchiveError.invalidQuery("too many content topics")) + let queryStartTime = getTime().toUnixFloat() let queryRes = w.driver.getMessages( @@ -145,25 +146,23 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. waku_archive_query_duration_seconds.observe(queryDuration) - # Build response if queryRes.isErr(): return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error)) let rows = queryRes.get() - if rows.len <= 0: - return ok(ArchiveResponse( - messages: @[], - cursor: none(ArchiveCursor) - )) - - - # TODO: Move cursor generation to the driver implementation module - var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) - else: rows[0..^2].mapIt(it[1]) + var messages = newSeq[WakuMessage]() var cursor = none(ArchiveCursor) + if rows.len == 0: + return ok(ArchiveResponse(messages: messages, cursor: cursor)) + + ## Messages + let pageSize = min(rows.len, int(qMaxPageSize)) + messages = rows[0.. int(qMaxPageSize): ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response @@ -171,22 +170,25 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] # TODO: Improve coherence of MessageDigest type - var messageDigest: array[32, byte] - for i in 0..