fix(archive): reverse the db query results in the waku archive front-end

This commit is contained in:
Lorenzo Delgado 2023-01-26 10:19:58 +01:00 committed by GitHub
parent 8c7a931f65
commit 95d31b3ed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 157 deletions

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[options, sequtils, random], std/[options, sequtils, random, algorithm],
testutils/unittests, testutils/unittests,
chronos, chronos,
chronicles chronicles
@ -160,7 +160,7 @@ suite "Queue driver - query by content topic":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[6..7] filteredMessages == expected[6..7].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -510,7 +510,7 @@ suite "Queue driver - query by cursor":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..3] filteredMessages == expected[2..3].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -600,7 +600,7 @@ suite "Queue driver - query by cursor":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..5] filteredMessages == expected[2..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -706,7 +706,7 @@ suite "Queue driver - query by cursor":
let expectedMessages = expected.mapIt(it[1]) let expectedMessages = expected.mapIt(it[1])
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expectedMessages[4..5] filteredMessages == expectedMessages[4..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -980,7 +980,7 @@ suite "Queue driver - query by time range":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..6] filteredMessages == expected[2..6].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -1078,7 +1078,7 @@ suite "Queue driver - query by time range":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[3..4] filteredMessages == expected[3..4].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -1188,7 +1188,7 @@ suite "Queue driver - query by time range":
let expectedMessages = expected.mapIt(it[1]) let expectedMessages = expected.mapIt(it[1])
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expectedMessages[4..5] filteredMessages == expectedMessages[4..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[options, sequtils, random], std/[options, sequtils, random, algorithm],
testutils/unittests, testutils/unittests,
chronos, chronos,
chronicles chronicles
@ -164,7 +164,7 @@ suite "SQLite driver - query by content topic":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[6..7] filteredMessages == expected[6..7].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -514,7 +514,7 @@ suite "SQLite driver - query by cursor":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..3] filteredMessages == expected[2..3].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -604,7 +604,7 @@ suite "SQLite driver - query by cursor":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..5] filteredMessages == expected[2..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -710,7 +710,7 @@ suite "SQLite driver - query by cursor":
let expectedMessages = expected.mapIt(it[1]) let expectedMessages = expected.mapIt(it[1])
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expectedMessages[4..5] filteredMessages == expectedMessages[4..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -984,7 +984,7 @@ suite "SQLite driver - query by time range":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[2..6] filteredMessages == expected[2..6].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -1082,7 +1082,7 @@ suite "SQLite driver - query by time range":
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expected[3..4] filteredMessages == expected[3..4].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")
@ -1192,7 +1192,7 @@ suite "SQLite driver - query by time range":
let expectedMessages = expected.mapIt(it[1]) let expectedMessages = expected.mapIt(it[1])
let filteredMessages = res.tryGet().mapIt(it[1]) let filteredMessages = res.tryGet().mapIt(it[1])
check: check:
filteredMessages == expectedMessages[4..5] filteredMessages == expectedMessages[4..5].reversed()
## Cleanup ## Cleanup
driver.close().expect("driver to close") driver.close().expect("driver to close")

View File

@ -12,8 +12,7 @@ import
../../../waku/v2/protocol/waku_archive/driver/sqlite_driver, ../../../waku/v2/protocol/waku_archive/driver/sqlite_driver,
../../../waku/v2/protocol/waku_archive, ../../../waku/v2/protocol/waku_archive,
../../../waku/v2/utils/time, ../../../waku/v2/utils/time,
../testlib/common, ../testlib/common
../testlib/switch
proc newTestDatabase(): SqliteDatabase = proc newTestDatabase(): SqliteDatabase =
@ -27,6 +26,15 @@ proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive =
let validator: MessageValidator = DefaultMessageValidator() let validator: MessageValidator = DefaultMessageValidator()
WakuArchive.new(driver, validator=some(validator)) WakuArchive.new(driver, validator=some(validator))
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
)
suite "Waku Archive - message handling": suite "Waku Archive - message handling":
@ -126,25 +134,25 @@ suite "Waku Archive - message handling":
procSuite "Waku Archive - find messages": procSuite "Waku Archive - find messages":
## Fixtures ## Fixtures
let timeOrigin = now() let timeOrigin = now()
let msgListA = @[
fakeWakuMessage(@[byte 00], contentTopic=ContentTopic("2"), ts=ts(00, timeOrigin)),
fakeWakuMessage(@[byte 01], contentTopic=ContentTopic("1"), ts=ts(10, timeOrigin)),
fakeWakuMessage(@[byte 02], contentTopic=ContentTopic("2"), ts=ts(20, timeOrigin)),
fakeWakuMessage(@[byte 03], contentTopic=ContentTopic("1"), ts=ts(30, timeOrigin)),
fakeWakuMessage(@[byte 04], contentTopic=ContentTopic("2"), ts=ts(40, timeOrigin)),
fakeWakuMessage(@[byte 05], contentTopic=ContentTopic("1"), ts=ts(50, timeOrigin)),
fakeWakuMessage(@[byte 06], contentTopic=ContentTopic("2"), ts=ts(60, timeOrigin)),
fakeWakuMessage(@[byte 07], contentTopic=ContentTopic("1"), ts=ts(70, timeOrigin)),
fakeWakuMessage(@[byte 08], contentTopic=ContentTopic("2"), ts=ts(80, timeOrigin)),
fakeWakuMessage(@[byte 09], contentTopic=ContentTopic("1"), ts=ts(90, timeOrigin))
]
let archiveA = block: let archiveA = block:
let let
driver = newTestArchiveDriver() driver = newTestArchiveDriver()
archive = newTestWakuArchive(driver) archive = newTestWakuArchive(driver)
let msgList = @[ for msg in msgListA:
fakeWakuMessage(@[byte 00], contentTopic=ContentTopic("2"), ts=ts(00, timeOrigin)),
fakeWakuMessage(@[byte 01], contentTopic=ContentTopic("1"), ts=ts(10, timeOrigin)),
fakeWakuMessage(@[byte 02], contentTopic=ContentTopic("2"), ts=ts(20, timeOrigin)),
fakeWakuMessage(@[byte 03], contentTopic=ContentTopic("1"), ts=ts(30, timeOrigin)),
fakeWakuMessage(@[byte 04], contentTopic=ContentTopic("2"), ts=ts(40, timeOrigin)),
fakeWakuMessage(@[byte 05], contentTopic=ContentTopic("1"), ts=ts(50, timeOrigin)),
fakeWakuMessage(@[byte 06], contentTopic=ContentTopic("2"), ts=ts(60, timeOrigin)),
fakeWakuMessage(@[byte 07], contentTopic=ContentTopic("1"), ts=ts(70, timeOrigin)),
fakeWakuMessage(@[byte 08], contentTopic=ContentTopic("2"), ts=ts(80, timeOrigin)),
fakeWakuMessage(@[byte 09], contentTopic=ContentTopic("1"), ts=ts(90, timeOrigin))
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
archive archive
@ -348,120 +356,76 @@ procSuite "Waku Archive - find messages":
response.messages.anyIt(it == msg3) response.messages.anyIt(it == msg3)
test "handle query with forward pagination": test "handle query with forward pagination":
## Setup
let
driver = newTestArchiveDriver()
archive = newTestWakuArchive(driver)
let currentTime = now()
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9),
fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8),
fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7),
fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6),
fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5),
fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4),
fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3),
fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2),
fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## Given ## Given
var req = ArchiveQuery( let req = ArchiveQuery(
contentTopics: @[DefaultContentTopic], pageSize: 4,
pageSize: 2,
ascending: true ascending: true
) )
## When ## When
var res = archive.findMessages(req) var nextReq = req # copy
require res.isOk()
var var pages = newSeq[seq[WakuMessage]](3)
response = res.tryGet() var cursors = newSeq[Option[ArchiveCursor]](3)
totalMessages = response.messages.len()
totalQueries = 1
while response.cursor.isSome(): for i in 0..<3:
require: let res = archiveA.findMessages(nextReq)
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
req.cursor = response.cursor
# Continue querying
res = archive.findMessages(req)
require res.isOk() require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len() # Keep query response content
totalQueries += 1 let response = res.get()
pages[i] = response.messages
cursors[i] = response.cursor
# Set/update the request cursor
nextReq.cursor = cursors[i]
## Then ## Then
check: check:
totalQueries == 4 # 4 queries of pageSize 2 cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[3]))
totalMessages == 8 # 8 messages in total cursors[1] == some(computeTestCursor(DefaultPubsubTopic, msgListA[7]))
cursors[2] == none(ArchiveCursor)
check:
pages[0] == msgListA[0..3]
pages[1] == msgListA[4..7]
pages[2] == msgListA[8..9]
test "handle query with backward pagination": test "handle query with backward pagination":
## Setup
let
driver = newTestArchiveDriver()
archive = newTestWakuArchive(driver)
let currentTime = now()
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9),
fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8),
fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7),
fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6),
fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5),
fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4),
fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3),
fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2),
fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## Given ## Given
var req = ArchiveQuery( let req = ArchiveQuery(
contentTopics: @[DefaultContentTopic], pageSize: 4,
pageSize: 2, ascending: false # backward
ascending: false
) )
## When ## When
var res = archive.findMessages(req) var nextReq = req # copy
require res.isOk()
var var pages = newSeq[seq[WakuMessage]](3)
response = res.tryGet() var cursors = newSeq[Option[ArchiveCursor]](3)
totalMessages = response.messages.len()
totalQueries = 1
while response.cursor.isSome(): for i in 0..<3:
require: let res = archiveA.findMessages(nextReq)
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
req.cursor = response.cursor
# Continue querying
res = archive.findMessages(req)
require res.isOk() require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len() # Keep query response content
totalQueries += 1 let response = res.get()
pages[i] = response.messages
cursors[i] = response.cursor
# Set/update the request cursor
nextReq.cursor = cursors[i]
## Then ## Then
check: check:
totalQueries == 4 # 4 queries of pageSize 2 cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[6]))
totalMessages == 8 # 8 messages in total cursors[1] == some(computeTestCursor(DefaultPubsubTopic, msgListA[2]))
cursors[2] == none(ArchiveCursor)
check:
pages[0] == msgListA[6..9]
pages[1] == msgListA[2..5]
pages[2] == msgListA[0..1]
test "handle query with no paging info - auto-pagination": test "handle query with no paging info - auto-pagination":
## Setup ## Setup

View File

@ -5,7 +5,7 @@ else:
import import
std/[tables, times, sequtils, options], std/[tables, times, sequtils, options, algorithm],
stew/results, stew/results,
chronicles, chronicles,
chronos, chronos,
@ -129,6 +129,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
if qContentTopics.len > 10: if qContentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics")) return err(ArchiveError.invalidQuery("too many content topics"))
let queryStartTime = getTime().toUnixFloat() let queryStartTime = getTime().toUnixFloat()
let queryRes = w.driver.getMessages( let queryRes = w.driver.getMessages(
@ -145,25 +146,23 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
waku_archive_query_duration_seconds.observe(queryDuration) waku_archive_query_duration_seconds.observe(queryDuration)
# Build response # Build response
if queryRes.isErr(): if queryRes.isErr():
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error)) return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error))
let rows = queryRes.get() let rows = queryRes.get()
if rows.len <= 0: var messages = newSeq[WakuMessage]()
return ok(ArchiveResponse(
messages: @[],
cursor: none(ArchiveCursor)
))
# TODO: Move cursor generation to the driver implementation module
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1])
var cursor = none(ArchiveCursor) var cursor = none(ArchiveCursor)
if rows.len == 0:
return ok(ArchiveResponse(messages: messages, cursor: cursor))
## Messages
let pageSize = min(rows.len, int(qMaxPageSize))
messages = rows[0..<pageSize].mapIt(it[1])
## Cursor
if rows.len > int(qMaxPageSize): if rows.len > int(qMaxPageSize):
## Build last message cursor ## Build last message cursor
## The cursor is built from the last message INCLUDED in the response ## The cursor is built from the last message INCLUDED in the response
@ -171,22 +170,25 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type # TODO: Improve coherence of MessageDigest type
var messageDigest: array[32, byte] let messageDigest = block:
for i in 0..<min(digest.len, 32): var data: array[32, byte]
messageDigest[i] = digest[i] for i in 0..<min(digest.len, 32):
data[i] = digest[i]
MessageDigest(data: data)
cursor = some(ArchiveCursor( cursor = some(ArchiveCursor(
pubsubTopic: pubsubTopic, pubsubTopic: pubsubTopic,
senderTime: message.timestamp, senderTime: message.timestamp,
storeTime: storeTimestamp, storeTime: storeTimestamp,
digest: MessageDigest(data: messageDigest) digest: messageDigest
)) ))
# All messages MUST be returned in chronological order
if not qAscendingOrder:
reverse(messages)
ok(ArchiveResponse( ok(ArchiveResponse(messages: messages, cursor: cursor))
messages: messages,
cursor: cursor
))
# Retention policy # Retention policy

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/[options, algorithm], std/options,
stew/results, stew/results,
stew/sorted_set, stew/sorted_set,
chronicles chronicles
@ -279,13 +279,7 @@ method getMessages*(
if pageRes.isErr(): if pageRes.isErr():
return err($pageRes.error) return err($pageRes.error)
var rows = pageRes.value ok(pageRes.value)
# All messages MUST be returned in chronological order
if not ascendingOrder:
reverse(rows)
ok(rows)
method getMessagesCount*(driver: QueueDriver): ArchiveDriverResult[int64] = method getMessagesCount*(driver: QueueDriver): ArchiveDriverResult[int64] =

View File

@ -6,7 +6,7 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/[options, algorithm], std/options,
stew/[byteutils, results], stew/[byteutils, results],
chronicles chronicles
import import
@ -107,7 +107,7 @@ method getMessages*(
): ArchiveDriverResult[seq[ArchiveRow]] = ): ArchiveDriverResult[seq[ArchiveRow]] =
let cursor = cursor.map(toDbCursor) let cursor = cursor.map(toDbCursor)
var rows = ?s.db.selectMessagesByHistoryQueryWithLimit( let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic, contentTopic,
pubsubTopic, pubsubTopic,
cursor, cursor,
@ -117,10 +117,6 @@ method getMessages*(
ascending=ascendingOrder ascending=ascendingOrder
) )
# All messages MUST be returned in chronological order
if not ascendingOrder:
reverse(rows)
ok(rows) ok(rows)