diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index 939273703..9cebc0b07 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -3,8 +3,7 @@ import std/[sequtils, strutils], stew/results, - testutils/unittests, - nimcrypto/hash + testutils/unittests import ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/protocol/waku_message, @@ -24,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = cursor = Index( receiverTime: Timestamp(i), senderTime: Timestamp(i), - digest: MDigest[256](data: data), + digest: MessageDigest(data: data), pubsubTopic: "test-pubsub-topic" ) @@ -182,35 +181,40 @@ procSuite "Sorted store queue": ## When let pageRes1 = store.getPage(predicate, pagingInfo) - let pageRes2 = store.getPage(predicate, pageRes1[1]) - let pageRes3 = store.getPage(predicate, pageRes2[1]) + require pageRes1.isOk() + let pageRes2 = store.getPage(predicate, pageRes1.value[1]) + require pageRes2.isOk() + let pageRes3 = store.getPage(predicate, pageRes2.value[1]) ## Then # First page - var (res, pInfo, err) = pageRes1 + check pageRes1.isOk() + + var (res, pInfo) = pageRes1.get() check: pInfo.pageSize == 3 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(3) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1,2,3] # Second page - (res, pInfo, err) = pageRes2 + check pageRes2.isOk() + + (res, pInfo) = pageRes2.get() check: pInfo.pageSize == 2 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(5) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[4,5] # Empty last page - (res, pInfo, err) = pageRes3 + check pageRes3.isOk() + + (res, pInfo) = pageRes3.get() check: pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(5) - err == HistoryResponseError.NONE res.len == 0 test "backward pagination": @@ -226,35 +230,40 @@ procSuite "Sorted store queue": ## When let pageRes1 = store.getPage(predicate, pagingInfo) - let pageRes2 = store.getPage(predicate, pageRes1[1]) - let pageRes3 = store.getPage(predicate, pageRes2[1]) + require pageRes1.isOk() + let pageRes2 = store.getPage(predicate, pageRes1.value[1]) + require pageRes2.isOk() + let pageRes3 = store.getPage(predicate, pageRes2.value[1]) ## Then # First page - var (res, pInfo, err) = pageRes1 + check pageRes1.isOk() + + var (res, pInfo) = pageRes1.get() check: pInfo.pageSize == 3 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(3) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[3,4,5] # Second page - (res, pInfo, err) = pageRes2 + check pageRes2.isOk() + + (res, pInfo) = pageRes2.get() check: pInfo.pageSize == 2 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(1) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1,2] # Empty last page - (res, pInfo, err) = pageRes3 + check pageRes3.isOk() + + (res, pInfo) = pageRes3.get() check: pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(1) - err == HistoryResponseError.NONE res.len == 0 test "Store queue pagination works with predicate - fwd direction": @@ -267,27 +276,30 @@ procSuite "Sorted store queue": proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0 ## When - let resPage1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)) - let resPage2 = store.getPage(onlyEvenTimes, resPage1[1]) + let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)) + require pageRes1.isOk() + let pageRes2 = store.getPage(onlyEvenTimes, pageRes1.value[1]) ## Then # First page - var (res, pInfo, err) = resPage1 + check pageRes1.isOk() + + var (res, pInfo) = pageRes1.get() check: pInfo.pageSize == 2 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(4) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[2,4] - (res, pInfo, err) = resPage2 # Empty next page + check pageRes2.isOk() + + (res, pInfo) = pageRes2.get() check: pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(4) - err == HistoryResponseError.NONE res.len == 0 test "Store queue pagination works with predicate - bwd direction": @@ -300,42 +312,47 @@ procSuite "Sorted store queue": proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0 ## When - let resPage1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)) - let resPage2 = store.getPage(onlyOddTimes, resPage1[1]) - let resPage3 = store.getPage(onlyOddTimes, resPage2[1]) + let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)) + require pageRes1.isOk() + let pageRes2 = store.getPage(onlyOddTimes, pageRes1.value[1]) + require pageRes2.isOk() + let pageRes3 = store.getPage(onlyOddTimes, pageRes2.value[1]) ## Then # First page - var (res, pInfo, err) = resPage1 + check pageRes1.isOk() + + var (res, pInfo) = pageRes1.get() check: pInfo.pageSize == 2 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(3) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[3,5] # Next page - (res, pInfo, err) = resPage2 + check pageRes2.isOk() + + (res, pInfo) = pageRes2.get() check: pInfo.pageSize == 1 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(1) - err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1] # Empty last page - (res, pInfo, err) = resPage3 + check pageRes3.isOk() + + (res, pInfo) = pageRes3.get() check: pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(1) - err == HistoryResponseError.NONE res.len == 0 test "handle pagination on empty store - fwd direction": ## Given let capacity = 5 - var store = StoreQueueRef.new(capacity) + let store = StoreQueueRef.new(capacity) proc predicate(i: IndexedWakuMessage): bool = true # no filtering @@ -343,51 +360,44 @@ procSuite "Sorted store queue": ## When # Get page from empty queue in fwd dir - let (res, pInfo, err) = store.getPage(predicate, pagingInfo) + let pageRes = store.getPage(predicate, pagingInfo) ## Then # Empty response + check pageRes.isOk() + + let (res, pInfo) = pageRes.get() check: pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD pInfo.cursor.senderTime == Timestamp(0) - err == HistoryResponseError.NONE res.len == 0 test "handle pagination on empty store - bwd direction": + ## Given let capacity = 5 - var store = StoreQueueRef.new(capacity) + let store = StoreQueueRef.new(capacity) proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - # Get page from empty queue in bwd dir - - var (res, pInfo, err) = store.getPage(predicate, - PagingInfo(pageSize: 3, - direction: PagingDirection.BACKWARD)) + let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD) + + ## When + # Get page from empty queue in bwd dir + let pageRes = store.getPage(predicate, pagingInfo) + + ## Then + # Empty response + check pageRes.isOk() + + let (res, pInfo) = pageRes.get() check: # Empty response pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD pInfo.cursor.senderTime == Timestamp(0) - err == HistoryResponseError.NONE res.len == 0 - # Get page from empty queue in fwd dir - - (res, pInfo, err) = store.getPage(predicate, - PagingInfo(pageSize: 3, - direction: PagingDirection.FORWARD)) - - check: - # Empty response - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(0) - err == HistoryResponseError.NONE - res.len == 0 - test "handle invalid cursor - fwd direction": ## Given let @@ -397,20 +407,16 @@ procSuite "Sorted store queue": proc predicate(i: IndexedWakuMessage): bool = true # no filtering - let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) + let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest()) let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD) ## When - let (res, pInfo, err) = store.getPage(predicate, pagingInfo) - + let pageRes = store.getPage(predicate, pagingInfo) + ## Then - # Empty response with error check: - res.len == 0 - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(3) - err == HistoryResponseError.INVALID_CURSOR + pageRes.isErr() + pageRes.error == HistoryResponseError.INVALID_CURSOR test "handle invalid cursor - bwd direction": ## Given @@ -421,21 +427,18 @@ procSuite "Sorted store queue": proc predicate(i: IndexedWakuMessage): bool = true # no filtering - let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) + let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest()) let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD) ## When - let (res, pInfo, err) = store.getPage(predicate, pagingInfo) + let pageRes = store.getPage(predicate, pagingInfo) ## Then # Empty response with error check: - res.len == 0 - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(3) - err == HistoryResponseError.INVALID_CURSOR - + pageRes.isErr() + pageRes.error == HistoryResponseError.INVALID_CURSOR + test "verify if store queue contains an index": ## Given let diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index 5dcd1331f..54d59d24f 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -36,9 +36,12 @@ proc getTestStoreQueue(numMessages: int): StoreQueueRef = return testStoreQueue -proc getTestTimestamp(): Timestamp = - let now = getNanosecondTime(epochTime()) - Timestamp(now) +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc ts(offset=0, origin=now()): Timestamp = + origin + getNanosecondTime(offset) + suite "Queue store - pagination": test "Forward pagination test": @@ -50,109 +53,96 @@ suite "Queue store - pagination": var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) # test for a normal pagination - var (data, newPagingInfo, error) = getPage(store, pagingInfo) + var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 2 data == msgList[4..5] newPagingInfo.cursor == indexList[5].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == pagingInfo.pageSize - error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 2 data == msgList[0..1] newPagingInfo.cursor == indexList[1].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 2 - error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 10 data == msgList[0..9] newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 10 - error == HistoryResponseError.NONE # test for an empty msgList pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(getTestStoreQueue(0), pagingInfo) + (data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet() check: data.len == 0 newPagingInfo.pageSize == 0 newPagingInfo.direction == pagingInfo.direction newPagingInfo.cursor == pagingInfo.cursor - error == HistoryResponseError.NONE # test for a page size larger than the remaining messages pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 6 data == msgList[4..9] newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 6 - error == HistoryResponseError.NONE # test for a page size larger than the maximum allowed page size pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: uint64(data.len) <= MaxPageSize newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize <= MaxPageSize - error == HistoryResponseError.NONE # test for a cursor pointing to the end of the message list pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 0 newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 - error == HistoryResponseError.NONE # test for an invalid cursor - let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) + let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + var error = getPage(store, pagingInfo).tryError() check: - data.len == 0 - newPagingInfo.cursor == pagingInfo.cursor - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 error == HistoryResponseError.INVALID_CURSOR # test initial paging query over a message list with one message var singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) + (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() check: data.len == 1 newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 1 - error == HistoryResponseError.NONE # test pagination over a message list with one message singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) + (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() check: data.len == 0 newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 - error == HistoryResponseError.NONE test "Backward pagination test": let @@ -163,105 +153,92 @@ suite "Queue store - pagination": var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) # test for a normal pagination - var (data, newPagingInfo, error) = getPage(store, pagingInfo) + var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data == msgList[1..2] newPagingInfo.cursor == indexList[1].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == pagingInfo.pageSize - error == HistoryResponseError.NONE # test for an empty msgList pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(getTestStoreQueue(0), pagingInfo) + (data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet() check: data.len == 0 newPagingInfo.pageSize == 0 newPagingInfo.direction == pagingInfo.direction newPagingInfo.cursor == pagingInfo.cursor - error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 2 data == msgList[8..9] newPagingInfo.cursor == indexList[8].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 2 - error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 10 data == msgList[0..9] newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 10 - error == HistoryResponseError.NONE # test for a page size larger than the remaining messages pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data == msgList[0..2] newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 3 - error == HistoryResponseError.NONE # test for a page size larger than the Maximum allowed page size pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: uint64(data.len) <= MaxPageSize newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize <= MaxPageSize - error == HistoryResponseError.NONE # test for a cursor pointing to the begining of the message list pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() check: data.len == 0 newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 - error == HistoryResponseError.NONE # test for an invalid cursor - let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) + let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(store, pagingInfo) + var error = getPage(store, pagingInfo).tryError() check: - data.len == 0 - newPagingInfo.cursor == pagingInfo.cursor - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 error == HistoryResponseError.INVALID_CURSOR # test initial paging query over a message list with one message var singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) + (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() check: data.len == 1 newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 1 - error == HistoryResponseError.NONE # test paging query over a message list with one message singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) + (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() check: data.len == 0 newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 - error == HistoryResponseError.NONE diff --git a/waku/v2/node/storage/message/queue_store/queue_store.nim b/waku/v2/node/storage/message/queue_store/queue_store.nim index ecff359c4..31838a161 100644 --- a/waku/v2/node/storage/message/queue_store/queue_store.nim +++ b/waku/v2/node/storage/message/queue_store/queue_store.nim @@ -30,6 +30,7 @@ type QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} + StoreQueueGetPageResult = Result[(seq[WakuMessage], PagingInfo), HistoryResponseError] type StoreQueueRef* = ref object of MessageStore @@ -95,97 +96,26 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], return prevItem -proc fwdPage(storeQueue: StoreQueueRef, +proc getPage(storeQueue: StoreQueueRef, pred: QueryFilterMatcher, maxPageSize: uint64, - startCursor: Option[Index]): - (seq[WakuMessage], PagingInfo, HistoryResponseError) = + forward: bool, + startCursor: Option[Index]): StoreQueueGetPageResult = ## Populate a single page in forward direction ## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined. ## Page size must not exceed `maxPageSize` ## Each entry must match the `pred` - trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor + trace "Retrieving page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor, forward=forward var outSeq: seq[WakuMessage] outPagingInfo: PagingInfo - outError: HistoryResponseError - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - currentEntry: SortedSetResult[Index, IndexedWakuMessage] - lastValidCursor: Index - - # Find first entry - if startCursor.isSome(): - lastValidCursor = startCursor.get() - - let cursorEntry = w.ffdToCursor(startCursor.get()) - if cursorEntry.isErr: - # Quick exit here if start cursor not found - trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor - outSeq = @[] - outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.FORWARD) - outError = HistoryResponseError.INVALID_CURSOR - w.destroy - return (outSeq, outPagingInfo, outError) + var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + defer: w.destroy() - # Advance walker once more - currentEntry = w.next - else: - # Start from the beginning of the queue - lastValidCursor = Index() # No valid (only empty) last cursor - currentEntry = w.first - - trace "Starting fwd page query", currentEntry=currentEntry - - ## This loop walks forward over the queue: - ## 1. from the given cursor (or first entry, if not provided) - ## 2. adds entries matching the predicate function to output page - ## 3. until either the end of the queue or maxPageSize is reached - var numberOfItems = 0.uint - while currentEntry.isOk and numberOfItems < maxPageSize: - - trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems - - if pred(currentEntry.value.data): - trace "Current item matches predicate. Adding to output." - lastValidCursor = currentEntry.value.key - outSeq.add(currentEntry.value.data.msg) - numberOfItems += 1 - currentEntry = w.next - w.destroy - - outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, - cursor: lastValidCursor.toPagingIndex(), - direction: PagingDirection.FORWARD) - - outError = HistoryResponseError.NONE - - trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo - - return (outSeq, outPagingInfo, outError) - -proc bwdPage(storeQueue: StoreQueueRef, - pred: QueryFilterMatcher, - maxPageSize: uint64, - startCursor: Option[Index]): - (seq[WakuMessage], PagingInfo, HistoryResponseError) = - ## Populate a single page in backward direction - ## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined. - ## Page size must not exceed `maxPageSize` - ## Each entry must match the `pred` - - trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor - var - outSeq: seq[WakuMessage] - outPagingInfo: PagingInfo - outError: HistoryResponseError - - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) currentEntry: SortedSetResult[Index, IndexedWakuMessage] lastValidCursor: Index @@ -193,51 +123,52 @@ proc bwdPage(storeQueue: StoreQueueRef, if startCursor.isSome(): lastValidCursor = startCursor.get() - let cursorEntry = w.rwdToCursor(startCursor.get()) + let cursorEntry = if forward: w.ffdToCursor(startCursor.get()) + else: w.rwdToCursor(startCursor.get()) if cursorEntry.isErr(): # Quick exit here if start cursor not found - trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor - outSeq = @[] - outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.BACKWARD) - outError = HistoryResponseError.INVALID_CURSOR - w.destroy - return (outSeq, outPagingInfo, outError) - - # Step walker one more step back - currentEntry = w.prev + trace "Starting cursor not found", startCursor=startCursor.get() + return err(HistoryResponseError.INVALID_CURSOR) + + # Advance walker once more + currentEntry = if forward: w.next() + else: w.prev() else: - # Start from the back of the queue + # Start from the beginning of the queue lastValidCursor = Index() # No valid (only empty) last cursor - currentEntry = w.last - - trace "Starting bwd page query", currentEntry=currentEntry + currentEntry = if forward: w.first() + else: w.last() - ## This loop walks backward over the queue: - ## 1. from the given cursor (or last entry, if not provided) + trace "Starting page query", currentEntry=currentEntry + + ## This loop walks forward over the queue: + ## 1. from the given cursor (or first/last entry, if not provided) ## 2. adds entries matching the predicate function to output page - ## 3. until either the beginning of the queue or maxPageSize is reached + ## 3. until either the end of the queue or maxPageSize is reached var numberOfItems = 0.uint while currentEntry.isOk() and numberOfItems < maxPageSize: - trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems - + trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems + if pred(currentEntry.value.data): - trace "Current item matches predicate. Adding to output." lastValidCursor = currentEntry.value.key outSeq.add(currentEntry.value.data.msg) numberOfItems += 1 - currentEntry = w.prev - w.destroy + + currentEntry = if forward: w.next() + else: w.prev() + + trace "Successfully retrieved page", len=outSeq.len outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, cursor: lastValidCursor.toPagingIndex(), - direction: PagingDirection.BACKWARD) - outError = HistoryResponseError.NONE + direction: if forward: PagingDirection.FORWARD + else: PagingDirection.BACKWARD) - trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo - - return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order - outPagingInfo, - outError) + # Even if paging backwards, each page should be in forward order + if not forward: + outSeq.reverse() + + return ok((outSeq, outPagingInfo)) #### API @@ -347,8 +278,7 @@ method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): Me proc getPage*(storeQueue: StoreQueueRef, pred: QueryFilterMatcher, - pagingInfo: PagingInfo): - (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = + pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} = ## Get a single page of history matching the predicate and ## adhering to the pagingInfo parameters @@ -359,15 +289,10 @@ proc getPage*(storeQueue: StoreQueueRef, else: some(pagingInfo.cursor.toIndex()) maxPageSize = pagingInfo.pageSize - case pagingInfo.direction - of PagingDirection.FORWARD: - return storeQueue.fwdPage(pred, maxPageSize, cursorOpt) - of PagingDirection.BACKWARD: - return storeQueue.bwdPage(pred, maxPageSize, cursorOpt) + let forward = pagingInfo.direction == PagingDirection.FORWARD + return storeQueue.getPage(pred, maxPageSize, forward, cursorOpt) -proc getPage*(storeQueue: StoreQueueRef, - pagingInfo: PagingInfo): - (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = +proc getPage*(storeQueue: StoreQueueRef, pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} = ## Get a single page of history without filtering. ## Adhere to the pagingInfo parameters @@ -418,11 +343,11 @@ method getMessagesByHistoryQuery*( direction: if ascendingOrder: PagingDirection.FORWARD else: PagingDirection.BACKWARD ) - let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo) - - if error == HistoryResponseError.INVALID_CURSOR: + let getPageRes = store.getPage(matchesQuery, queryPagingInfo) + if getPageRes.isErr(): return err("invalid cursor") + let (messages, pagingInfo) = getPageRes.value if messages.len == 0: return ok((messages, none(PagingInfo)))