refactor(store): simplify store queue query implementation

This commit is contained in:
Lorenzo Delgado 2022-10-03 08:40:01 +02:00 committed by GitHub
parent d614e4504b
commit c61e88b352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 246 deletions

View File

@ -3,8 +3,7 @@
import
std/[sequtils, strutils],
stew/results,
testutils/unittests,
nimcrypto/hash
testutils/unittests
import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/waku_message,
@ -24,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
cursor = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MDigest[256](data: data),
digest: MessageDigest(data: data),
pubsubTopic: "test-pubsub-topic"
)
@ -182,35 +181,40 @@ procSuite "Sorted store queue":
## When
let pageRes1 = store.getPage(predicate, pagingInfo)
let pageRes2 = store.getPage(predicate, pageRes1[1])
let pageRes3 = store.getPage(predicate, pageRes2[1])
require pageRes1.isOk()
let pageRes2 = store.getPage(predicate, pageRes1.value[1])
require pageRes2.isOk()
let pageRes3 = store.getPage(predicate, pageRes2.value[1])
## Then
# First page
var (res, pInfo, err) = pageRes1
check pageRes1.isOk()
var (res, pInfo) = pageRes1.get()
check:
pInfo.pageSize == 3
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2,3]
# Second page
(res, pInfo, err) = pageRes2
check pageRes2.isOk()
(res, pInfo) = pageRes2.get()
check:
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[4,5]
# Empty last page
(res, pInfo, err) = pageRes3
check pageRes3.isOk()
(res, pInfo) = pageRes3.get()
check:
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE
res.len == 0
test "backward pagination":
@ -226,35 +230,40 @@ procSuite "Sorted store queue":
## When
let pageRes1 = store.getPage(predicate, pagingInfo)
let pageRes2 = store.getPage(predicate, pageRes1[1])
let pageRes3 = store.getPage(predicate, pageRes2[1])
require pageRes1.isOk()
let pageRes2 = store.getPage(predicate, pageRes1.value[1])
require pageRes2.isOk()
let pageRes3 = store.getPage(predicate, pageRes2.value[1])
## Then
# First page
var (res, pInfo, err) = pageRes1
check pageRes1.isOk()
var (res, pInfo) = pageRes1.get()
check:
pInfo.pageSize == 3
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,4,5]
# Second page
(res, pInfo, err) = pageRes2
check pageRes2.isOk()
(res, pInfo) = pageRes2.get()
check:
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2]
# Empty last page
(res, pInfo, err) = pageRes3
check pageRes3.isOk()
(res, pInfo) = pageRes3.get()
check:
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.len == 0
test "Store queue pagination works with predicate - fwd direction":
@ -267,27 +276,30 @@ procSuite "Sorted store queue":
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
## When
let resPage1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD))
let resPage2 = store.getPage(onlyEvenTimes, resPage1[1])
let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD))
require pageRes1.isOk()
let pageRes2 = store.getPage(onlyEvenTimes, pageRes1.value[1])
## Then
# First page
var (res, pInfo, err) = resPage1
check pageRes1.isOk()
var (res, pInfo) = pageRes1.get()
check:
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[2,4]
(res, pInfo, err) = resPage2
# Empty next page
check pageRes2.isOk()
(res, pInfo) = pageRes2.get()
check:
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE
res.len == 0
test "Store queue pagination works with predicate - bwd direction":
@ -300,42 +312,47 @@ procSuite "Sorted store queue":
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
## When
let resPage1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD))
let resPage2 = store.getPage(onlyOddTimes, resPage1[1])
let resPage3 = store.getPage(onlyOddTimes, resPage2[1])
let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD))
require pageRes1.isOk()
let pageRes2 = store.getPage(onlyOddTimes, pageRes1.value[1])
require pageRes2.isOk()
let pageRes3 = store.getPage(onlyOddTimes, pageRes2.value[1])
## Then
# First page
var (res, pInfo, err) = resPage1
check pageRes1.isOk()
var (res, pInfo) = pageRes1.get()
check:
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,5]
# Next page
(res, pInfo, err) = resPage2
check pageRes2.isOk()
(res, pInfo) = pageRes2.get()
check:
pInfo.pageSize == 1
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1]
# Empty last page
(res, pInfo, err) = resPage3
check pageRes3.isOk()
(res, pInfo) = pageRes3.get()
check:
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.len == 0
test "handle pagination on empty store - fwd direction":
## Given
let capacity = 5
var store = StoreQueueRef.new(capacity)
let store = StoreQueueRef.new(capacity)
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
@ -343,51 +360,44 @@ procSuite "Sorted store queue":
## When
# Get page from empty queue in fwd dir
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
let pageRes = store.getPage(predicate, pagingInfo)
## Then
# Empty response
check pageRes.isOk()
let (res, pInfo) = pageRes.get()
check:
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE
res.len == 0
test "handle pagination on empty store - bwd direction":
## Given
let capacity = 5
var store = StoreQueueRef.new(capacity)
let store = StoreQueueRef.new(capacity)
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
# Get page from empty queue in bwd dir
var (res, pInfo, err) = store.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.BACKWARD))
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD)
## When
# Get page from empty queue in bwd dir
let pageRes = store.getPage(predicate, pagingInfo)
## Then
# Empty response
check pageRes.isOk()
let (res, pInfo) = pageRes.get()
check:
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE
res.len == 0
# Get page from empty queue in fwd dir
(res, pInfo, err) = store.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.FORWARD))
check:
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE
res.len == 0
test "handle invalid cursor - fwd direction":
## Given
let
@ -397,20 +407,16 @@ procSuite "Sorted store queue":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest())
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD)
## When
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
let pageRes = store.getPage(predicate, pagingInfo)
## Then
# Empty response with error
check:
res.len == 0
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR
pageRes.isErr()
pageRes.error == HistoryResponseError.INVALID_CURSOR
test "handle invalid cursor - bwd direction":
## Given
@ -421,21 +427,18 @@ procSuite "Sorted store queue":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest())
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD)
## When
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
let pageRes = store.getPage(predicate, pagingInfo)
## Then
# Empty response with error
check:
res.len == 0
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR
pageRes.isErr()
pageRes.error == HistoryResponseError.INVALID_CURSOR
test "verify if store queue contains an index":
## Given
let

View File

@ -36,9 +36,12 @@ proc getTestStoreQueue(numMessages: int): StoreQueueRef =
return testStoreQueue
proc getTestTimestamp(): Timestamp =
let now = getNanosecondTime(epochTime())
Timestamp(now)
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc ts(offset=0, origin=now()): Timestamp =
origin + getNanosecondTime(offset)
suite "Queue store - pagination":
test "Forward pagination test":
@ -50,109 +53,96 @@ suite "Queue store - pagination":
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = getPage(store, pagingInfo)
var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 2
data == msgList[4..5]
newPagingInfo.cursor == indexList[5].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 2
data == msgList[0..1]
newPagingInfo.cursor == indexList[1].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
# test for an empty msgList
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(getTestStoreQueue(0), pagingInfo)
(data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.pageSize == 0
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.cursor == pagingInfo.cursor
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 6
data == msgList[4..9]
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 6
error == HistoryResponseError.NONE
# test for a page size larger than the maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize <= MaxPageSize
error == HistoryResponseError.NONE
# test for a cursor pointing to the end of the message list
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
var error = getPage(store, pagingInfo).tryError()
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.INVALID_CURSOR
# test initial paging query over a message list with one message
var singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
check:
data.len == 1
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test pagination over a message list with one message
singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
test "Backward pagination test":
let
@ -163,105 +153,92 @@ suite "Queue store - pagination":
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = getPage(store, pagingInfo)
var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data == msgList[1..2]
newPagingInfo.cursor == indexList[1].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
# test for an empty msgList
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(getTestStoreQueue(0), pagingInfo)
(data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.pageSize == 0
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.cursor == pagingInfo.cursor
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 2
data == msgList[8..9]
newPagingInfo.cursor == indexList[8].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data == msgList[0..2]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 3
error == HistoryResponseError.NONE
# test for a page size larger than the Maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize <= MaxPageSize
error == HistoryResponseError.NONE
# test for a cursor pointing to the begining of the message list
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
var error = getPage(store, pagingInfo).tryError()
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.INVALID_CURSOR
# test initial paging query over a message list with one message
var singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
check:
data.len == 1
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test paging query over a message list with one message
singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
check:
data.len == 0
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE

View File

@ -30,6 +30,7 @@ type
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
StoreQueueGetPageResult = Result[(seq[WakuMessage], PagingInfo), HistoryResponseError]
type
StoreQueueRef* = ref object of MessageStore
@ -95,97 +96,26 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
return prevItem
proc fwdPage(storeQueue: StoreQueueRef,
proc getPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
forward: bool,
startCursor: Option[Index]): StoreQueueGetPageResult =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
trace "Retrieving page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor, forward=forward
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find first entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.ffdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
defer: w.destroy()
# Advance walker once more
currentEntry = w.next
else:
# Start from the beginning of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.first
trace "Starting fwd page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.next
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor.toPagingIndex(),
direction: PagingDirection.FORWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq, outPagingInfo, outError)
proc bwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in backward direction
## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
@ -193,51 +123,52 @@ proc bwdPage(storeQueue: StoreQueueRef,
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.rwdToCursor(startCursor.get())
let cursorEntry = if forward: w.ffdToCursor(startCursor.get())
else: w.rwdToCursor(startCursor.get())
if cursorEntry.isErr():
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
# Step walker one more step back
currentEntry = w.prev
trace "Starting cursor not found", startCursor=startCursor.get()
return err(HistoryResponseError.INVALID_CURSOR)
# Advance walker once more
currentEntry = if forward: w.next()
else: w.prev()
else:
# Start from the back of the queue
# Start from the beginning of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.last
trace "Starting bwd page query", currentEntry=currentEntry
currentEntry = if forward: w.first()
else: w.last()
## This loop walks backward over the queue:
## 1. from the given cursor (or last entry, if not provided)
trace "Starting page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first/last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the beginning of the queue or maxPageSize is reached
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk() and numberOfItems < maxPageSize:
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.prev
w.destroy
currentEntry = if forward: w.next()
else: w.prev()
trace "Successfully retrieved page", len=outSeq.len
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor.toPagingIndex(),
direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.NONE
direction: if forward: PagingDirection.FORWARD
else: PagingDirection.BACKWARD)
trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
outPagingInfo,
outError)
# Even if paging backwards, each page should be in forward order
if not forward:
outSeq.reverse()
return ok((outSeq, outPagingInfo))
#### API
@ -347,8 +278,7 @@ method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): Me
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
@ -359,15 +289,10 @@ proc getPage*(storeQueue: StoreQueueRef,
else: some(pagingInfo.cursor.toIndex())
maxPageSize = pagingInfo.pageSize
case pagingInfo.direction
of PagingDirection.FORWARD:
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
of PagingDirection.BACKWARD:
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
let forward = pagingInfo.direction == PagingDirection.FORWARD
return storeQueue.getPage(pred, maxPageSize, forward, cursorOpt)
proc getPage*(storeQueue: StoreQueueRef,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
proc getPage*(storeQueue: StoreQueueRef, pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
@ -418,11 +343,11 @@ method getMessagesByHistoryQuery*(
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo)
if error == HistoryResponseError.INVALID_CURSOR:
let getPageRes = store.getPage(matchesQuery, queryPagingInfo)
if getPageRes.isErr():
return err("invalid cursor")
let (messages, pagingInfo) = getPageRes.value
if messages.len == 0:
return ok((messages, none(PagingInfo)))