diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 2d14b6b3f..dbe2c1f0f 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -19,7 +19,9 @@ import ./v2/test_namespacing_utils, ./v2/test_waku_dnsdisc, ./v2/test_waku_discv5, - ./v2/test_enr_utils + ./v2/test_enr_utils, + ./v2/test_waku_store_queue, + ./v2/test_pagination_utils when defined(rln): import ./v2/test_waku_rln_relay diff --git a/tests/v2/test_pagination_utils.nim b/tests/v2/test_pagination_utils.nim new file mode 100644 index 000000000..62033efff --- /dev/null +++ b/tests/v2/test_pagination_utils.nim @@ -0,0 +1,83 @@ +{.used.} + +import + testutils/unittests, + chronos, + stew/byteutils, + libp2p/crypto/crypto, + ../../waku/v2/utils/pagination + +procSuite "Pagination utils": + + ## Helpers + proc hashFromStr(input: string): MDigest[256] = + var ctx: sha256 + + ctx.init() + ctx.update(input.toBytes()) # converts the input to bytes + + let hashed = ctx.finish() # computes the hash + ctx.clear() + + return hashed + + ## Test vars + let + smallIndex1 = Index(digest: hashFromStr("1234"), + receiverTime: 0.00, + senderTime: 1000.00) + smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime + receiverTime: 0.00, + senderTime: 1000.00) + largeIndex1 = Index(digest: hashFromStr("1234"), + receiverTime: 0.00, + senderTime: 9000.00) # only senderTime differ from smallIndex1 + largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 + receiverTime: 0.00, + senderTime: 1000.00) + eqIndex1 = Index(digest: hashFromStr("0003"), + receiverTime: 0.00, + senderTime: 54321.00) + eqIndex2 = Index(digest: hashFromStr("0003"), + receiverTime: 0.00, + senderTime: 54321.00) + eqIndex3 = Index(digest: hashFromStr("0003"), + receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons + senderTime: 54321.00) + + + ## Test suite + asyncTest "Index comparison": + check: + # Index comparison with senderTime diff + cmp(smallIndex1, largeIndex1) < 0 + cmp(smallIndex2, largeIndex1) < 0 + + # Index comparison with digest diff + cmp(smallIndex1, smallIndex2) < 0 + cmp(smallIndex1, largeIndex2) < 0 + cmp(smallIndex2, largeIndex2) > 0 + cmp(largeIndex1, largeIndex2) > 0 + + # Index comparison when equal + cmp(eqIndex1, eqIndex2) == 0 + + # receiverTime difference play no role + cmp(eqIndex1, eqIndex3) == 0 + + asyncTest "Index equality": + check: + # Exactly equal + eqIndex1 == eqIndex2 + + # Receiver time plays no role + eqIndex1 == eqIndex3 + + # Unequal sender time + smallIndex1 != largeIndex1 + + # Unequal digest + smallIndex1 != smallIndex2 + + # Unequal hash and digest + smallIndex1 != eqIndex1 diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 55c52c3df..d4ff5b95b 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -1,18 +1,27 @@ {.used.} import - std/[algorithm, options], + std/[algorithm, options, sequtils], testutils/unittests, nimcrypto/sha2, libp2p/protobuf/minprotobuf, ../../waku/v2/protocol/waku_store/waku_store, ../test_helpers -proc createSampleList(s: int): seq[IndexedWakuMessage] = - ## takes s as input and outputs a sequence with s amount of IndexedWakuMessage +proc createSampleStoreQueue(s: int): StoreQueueRef = + ## takes s as input and outputs a StoreQueue with s amount of IndexedWakuMessage + + let testStoreQueue = StoreQueueRef.new(s) + var data {.noinit.}: array[32, byte] for x in data.mitems: x = 1 + for i in 0.. 0 + prevSmaller = index + + # Walk backward through the set and verify descending order + var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index + for i in testStoreQueue.bwdIterator: + let (index, indexedWakuMessage) = i + check cmp(index, prevLarger) < 0 + prevLarger = index + + test "Can access first item from store queue": + let first = testStoreQueue.first() + check: + first.isOk() + first.get().msg.timestamp == 1.0 + + # Error condition + let emptyQ = StoreQueueRef.new(capacity) + check: + emptyQ.first().isErr() + + test "Can access last item from store queue": + let last = testStoreQueue.last() + check: + last.isOk() + last.get().msg.timestamp == 5.0 + + # Error condition + let emptyQ = StoreQueueRef.new(capacity) + check: + emptyQ.last().isErr() + + test "Store queue forward pagination works": + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + var (res, pInfo, err) = testStoreQueue.getPage(predicate, + PagingInfo(pageSize: 3, + direction: PagingDirection.FORWARD)) + + check: + # First page + pInfo.pageSize == 3 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 3.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[1,2,3] + + + (res, pInfo, err) = testStoreQueue.getPage(predicate, + pInfo) + + check: + # Second page + pInfo.pageSize == 2 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 5.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[4,5] + + (res, pInfo, err) = testStoreQueue.getPage(predicate, + pInfo) + + check: + # Empty last page + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 5.0 + err == HistoryResponseError.NONE + res.len == 0 + + test "Store queue backward pagination works": + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + var (res, pInfo, err) = testStoreQueue.getPage(predicate, + PagingInfo(pageSize: 3, + direction: PagingDirection.BACKWARD)) + + check: + # First page + pInfo.pageSize == 3 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 3.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[3,4,5] + + + (res, pInfo, err) = testStoreQueue.getPage(predicate, + pInfo) + + check: + # Second page + pInfo.pageSize == 2 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 1.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[1,2] + + (res, pInfo, err) = testStoreQueue.getPage(predicate, + pInfo) + + check: + # Empty last page + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 1.0 + err == HistoryResponseError.NONE + res.len == 0 + + test "Store queue pagination works with predicate": + proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0 + proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0 + + ## Forward pagination: only even timestamped messages + + var (res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes, + PagingInfo(pageSize: 2, + direction: PagingDirection.FORWARD)) + + check: + # First page + pInfo.pageSize == 2 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 4.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[2,4] + + (res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes, + pInfo) + + check: + # Empty next page + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 4.0 + err == HistoryResponseError.NONE + res.len == 0 + + ## Backward pagination: only odd timestamped messages + + (res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes, + PagingInfo(pageSize: 2, + direction: PagingDirection.BACKWARD)) + + check: + # First page + pInfo.pageSize == 2 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 3.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[3,5] + + (res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes, + pInfo) + + check: + # Next page + pInfo.pageSize == 1 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 1.0 + err == HistoryResponseError.NONE + res.mapIt(it.timestamp.int) == @[1] + + (res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes, + pInfo) + + check: + # Empty last page + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 1.0 + err == HistoryResponseError.NONE + res.len == 0 + + test "Store queue pagination handles invalid cursor": + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + # Invalid cursor in backwards direction + + var (res, pInfo, err) = testStoreQueue.getPage(predicate, + PagingInfo(pageSize: 3, + cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), + direction: PagingDirection.BACKWARD)) + + check: + # Empty response with error + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 3.0 + err == HistoryResponseError.INVALID_CURSOR + res.len == 0 + + # Same test, but forward direction + + (res, pInfo, err) = testStoreQueue.getPage(predicate, + PagingInfo(pageSize: 3, + cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), + direction: PagingDirection.FORWARD)) + + check: + # Empty response with error + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 3.0 + err == HistoryResponseError.INVALID_CURSOR + res.len == 0 + + test "Store queue pagination works on empty list": + var stQ = StoreQueueRef.new(capacity) + check: + stQ.len == 0 # Empty when initialised + + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + # Get page from empty queue in bwd dir + + var (res, pInfo, err) = stQ.getPage(predicate, + PagingInfo(pageSize: 3, + direction: PagingDirection.BACKWARD)) + + check: + # Empty response + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.BACKWARD + pInfo.cursor.senderTime == 0.0 + err == HistoryResponseError.NONE + res.len == 0 + + # Get page from empty queue in fwd dir + + (res, pInfo, err) = stQ.getPage(predicate, + PagingInfo(pageSize: 3, + direction: PagingDirection.FORWARD)) + + check: + # Empty response + pInfo.pageSize == 0 + pInfo.direction == PagingDirection.FORWARD + pInfo.cursor.senderTime == 0.0 + err == HistoryResponseError.NONE + res.len == 0 + + test "Can verify if store queue contains an index": + let + existingIndex = genIndexedWakuMessage(4).index + nonExistingIndex = genIndexedWakuMessage(99).index + check: + testStoreQueue.contains(existingIndex) == true + testStoreQueue.contains(nonExistingIndex) == false diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index ed935875b..1bee86608 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1149,7 +1149,7 @@ procSuite "WakuNode": let index1 = computeIndex(msg1) let output1 = store.put(index1, msg1, DefaultTopic) check output1.isOk - node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic)) + discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic)) # now run the resume proc await node1.resume() diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index b97bec04c..fbddef80b 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az272-316: +# Libtool was configured on host fv-az196-575: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 33697f60b..7cf3ca99e 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -231,7 +231,6 @@ proc encode*(query: HistoryQuery): ProtoBuffer = return output - proc encode*(response: HistoryResponse): ProtoBuffer = var output = initProtoBuffer() @@ -253,128 +252,9 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer = return output -proc indexComparison* (x, y: Index): int = - ## compares x and y - ## returns 0 if they are equal - ## returns -1 if x < y - ## returns 1 if x > y - let - timecmp = system.cmp(x.senderTime, y.senderTime) - digestcm = system.cmp(x.digest.data, y.digest.data) - if timecmp != 0: # timestamp has a higher priority for comparison - return timecmp - return digestcm +proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = + ## Query history to return a single page of messages matching the query -proc indexedWakuMessageComparison*(x, y: IndexedWakuMessage): int = - ## compares x and y - ## returns 0 if they are equal - ## returns -1 if x < y - ## returns 1 if x > y - return indexComparison(x.index, y.index) - -proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] = - ## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index - ## returns none if no match is found - for i, indexedWakuMessage in msgList: - if indexedWakuMessage.index == index: - return some(i) - return none(int) - -proc paginate*(msgList: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) = - ## takes a message list, and performs paging based on pinfo - ## the message list must be sorted - ## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request - var - cursor = pinfo.cursor - pageSize = pinfo.pageSize - dir = pinfo.direction - output: (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) - - if msgList.len == 0: # no pagination is needed for an empty list - output = (msgList, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE) - return output - - ## Adjust pageSize: - ## - pageSize should not exceed maximum - ## - pageSize being zero indicates "no pagination", but we still limit - ## responses to no more than a page of MaxPageSize messages - if (pageSize == uint64(0)) or (pageSize > MaxPageSize): - pageSize = MaxPageSize - - let total = uint64(msgList.len) - - # set the cursor of the initial paging request - var isInitialQuery = false - var cursorIndex: uint64 - if cursor == Index(): # an empty cursor means it is an initial query - isInitialQuery = true - case dir - of PagingDirection.FORWARD: - cursorIndex = 0 - cursor = msgList[cursorIndex].index # set the cursor to the beginning of the list - of PagingDirection.BACKWARD: - cursorIndex = total - 1 - cursor = msgList[cursorIndex].index # set the cursor to the end of the list - else: - var cursorIndexOption = msgList.findIndex(cursor) - if cursorIndexOption.isNone: # the cursor is not valid - output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR) - return output - cursorIndex = uint64(cursorIndexOption.get()) - - case dir - of PagingDirection.FORWARD: # forward pagination - # set the index of the first message in the page - # exclude the message pointing by the cursor - var startIndex = cursorIndex + 1 - # for the initial query, include the message pointing by the cursor - if isInitialQuery: - startIndex = cursorIndex - - # adjust the pageSize based on the total remaining messages - pageSize = min(pageSize, total - startIndex) - - if (pageSize == 0): - output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE) - return output - - # set the index of the last message in the page - var endIndex = startIndex + pageSize - 1 - - # retrieve the messages - var retMessages: seq[IndexedWakuMessage] - for i in startIndex..endIndex: - retMessages.add(msgList[i]) - output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[endIndex].index, direction : pinfo.direction), HistoryResponseError.NONE) - return output - - of PagingDirection.BACKWARD: - # set the index of the last message in the page - # exclude the message pointing by the cursor - var endIndex = cursorIndex - 1 - # for the initial query, include the message pointing by the cursor - if isInitialQuery: - endIndex = cursorIndex - - # adjust the pageSize based on the total remaining messages - pageSize = min(pageSize, endIndex + 1) - - if (pageSize == 0): - output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE) - return output - - # set the index of the first message in the page - var startIndex = endIndex - pageSize + 1 - - # retrieve the messages - var retMessages: seq[IndexedWakuMessage] - for i in startIndex..endIndex: - retMessages.add(msgList[i]) - output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[startIndex].index, direction : pinfo.direction), HistoryResponseError.NONE) - return output - - -proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = ## Extract query criteria ## All query criteria are optional let @@ -407,21 +287,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = return true - ## Filter history using predicate and sort on indexedWakuMessageComparison - ## TODO: since MaxPageSize is likely much smaller than w.messages.len, - ## we could optimise here by only filtering a portion of w.messages, - ## and repeat until we have populated a full page. - ## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew - ## sorted set? - let filteredMsgs = w.messages.filterIt(it.matchesQuery) - .sorted(indexedWakuMessageComparison) - - ## Paginate the filtered messages - let (indexedWakuMsgList, updatedPagingInfo, error) = paginate(filteredMsgs, query.pagingInfo) - - ## Extract and return response let - wakuMsgList = indexedWakuMsgList.mapIt(it.msg) + # Read a page of history matching the query + (wakuMsgList, updatedPagingInfo, error) = w.messages.getPage(matchesQuery, query.pagingInfo) + # Build response historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) return historyRes @@ -461,14 +330,14 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = ws.handler = handler ws.codec = WakuStoreCodec - ws.messages = initQueue(capacity) + ws.messages = StoreQueueRef.new(capacity) if ws.store.isNil: return proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) = # TODO index should not be recalculated - ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic)) + discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic)) info "attempting to load messages from persistent storage" @@ -505,8 +374,14 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = trace "handle message in WakuStore", topic=topic, msg=msg let index = msg.computeIndex() - w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) + let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) + + if addRes.isErr: + trace "Attempt to add message with duplicate index to store", msg=msg, index=index + waku_store_errors.inc(labelValues = ["duplicate"]) + waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) + if w.store.isNil: return @@ -637,20 +512,6 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI debug "failed to resolve the query" return err("failed to resolve the query") -proc findLastSeen*(list: seq[IndexedWakuMessage]): float = - var lastSeenTime = float64(0) - for iwmsg in list.items : - if iwmsg.msg.timestamp>lastSeenTime: - lastSeenTime = iwmsg.msg.timestamp - return lastSeenTime - -proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool = - ## return true if a duplicate message is found, otherwise false - # it is defined as a separate proc to be able to adjust comparison criteria - # e.g., to exclude timestamp or include pubsub topic - if message in list: return true - return false - proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db @@ -664,9 +525,13 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string var currentTime = epochTime() - var lastSeenTime: float = findLastSeen(ws.messages.allItems()) debug "resume", currentEpochTime=currentTime - + + let lastSeenItem = ws.messages.last() + + var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp + else: float64(0) + # adjust the time window with an offset of 20 seconds let offset: float64 = 200000 currentTime = currentTime + offset @@ -677,22 +542,21 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize) rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo) - var dismissed: uint = 0 var added: uint = 0 proc save(msgList: seq[WakuMessage]) = debug "save proc is called" # exclude index from the comparison criteria - let currentMsgSummary = ws.messages.mapIt(it.msg) + for msg in msgList: + let index = msg.computeIndex() # check for duplicate messages # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic - if isDuplicate(msg,currentMsgSummary): + if ws.messages.contains(index): dismissed = dismissed + 1 continue # store the new message - let index = msg.computeIndex() let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic) # store in db if exists @@ -702,8 +566,8 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem trace "failed to store messages", err = res.error waku_store_errors.inc(labelValues = ["store_failure"]) continue - - ws.messages.add(indexedWakuMsg) + + discard ws.messages.add(indexedWakuMsg) added = added + 1 waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 8184d1a1e..10b9a76bb 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -4,11 +4,11 @@ # Group by std, external then internal imports import + std/[algorithm, options], # external imports - std/sequtils, bearssl, libp2p/protocols/protocol, - stew/results, + stew/[results, sorted_set], # internal imports ../../node/storage/message/message_store, ../../utils/pagination, @@ -43,6 +43,8 @@ type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} + QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} + IndexedWakuMessage* = object # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message ## This type is used to encapsulate a WakuMessage and its Index @@ -86,7 +88,7 @@ type QueryResult* = Result[uint64, string] MessagesResult* = Result[seq[WakuMessage], string] - StoreQueue* = object + StoreQueueRef* = ref object ## Bounded repository for indexed messages ## ## The store queue will keep messages up to its @@ -97,14 +99,16 @@ type ## for new items. ## ## @ TODO: a circular/ring buffer may be a more efficient implementation - ## @ TODO: consider adding message hashes for easy duplicate checks - items: seq[IndexedWakuMessage] # FIFO queue of stored messages + ## @ TODO: we don't need to store the Index twice (as key and in the value) + items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages capacity: int # Maximum amount of messages to keep + + StoreQueueResult*[T] = Result[T, cstring] WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext - messages*: StoreQueue + messages*: StoreQueueRef store*: MessageStore wakuSwap*: WakuSwap persistMessages*: bool @@ -113,30 +117,282 @@ type # StoreQueue helpers # ###################### -proc initQueue*(capacity: int): StoreQueue = - var storeQueue: StoreQueue - storeQueue.items = newSeqOfCap[IndexedWakuMessage](capacity) - storeQueue.capacity = capacity - return storeQueue +proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], + startCursor: Index): + SortedSetResult[Index, IndexedWakuMessage] = + ## Fast forward `w` to start cursor + ## TODO: can probably improve performance here with a binary/tree search + + var nextItem = w.first + + ## Fast forward until we reach the startCursor + while nextItem.isOk: + if nextItem.value.key == startCursor: + # Exit ffd loop when we find the start cursor + break -proc add*(storeQueue: var StoreQueue, msg: IndexedWakuMessage) {.noSideEffect.} = + # Not yet at cursor. Continue advancing + nextItem = w.next + + return nextItem + +proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], + startCursor: Index): + SortedSetResult[Index, IndexedWakuMessage] = + ## Rewind `w` to start cursor + ## TODO: can probably improve performance here with a binary/tree search + + var prevItem = w.last + + ## Rewind until we reach the startCursor + + while prevItem.isOk: + if prevItem.value.key == startCursor: + # Exit rwd loop when we find the start cursor + break + + # Not yet at cursor. Continue rewinding. + prevItem = w.prev + + return prevItem + +proc fwdPage(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + maxPageSize: uint64, + startCursor: Option[Index]): + (seq[WakuMessage], PagingInfo, HistoryResponseError) = + ## Populate a single page in forward direction + ## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined. + ## Page size must not exceed `maxPageSize` + ## Each entry must match the `pred` + + var + outSeq: seq[WakuMessage] + outPagingInfo: PagingInfo + outError: HistoryResponseError + + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + currentEntry: SortedSetResult[Index, IndexedWakuMessage] + lastValidCursor: Index + + # Find first entry + if startCursor.isSome(): + lastValidCursor = startCursor.get() + + let cursorEntry = w.ffdToCursor(startCursor.get()) + if cursorEntry.isErr: + # Quick exit here if start cursor not found + outSeq = @[] + outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD) + outError = HistoryResponseError.INVALID_CURSOR + return (outSeq, outPagingInfo, outError) + + # Advance walker once more + currentEntry = w.next + else: + # Start from the beginning of the queue + lastValidCursor = Index() # No valid (only empty) last cursor + currentEntry = w.first + + ## This loop walks forward over the queue: + ## 1. from the given cursor (or first entry, if not provided) + ## 2. adds entries matching the predicate function to output page + ## 3. until either the end of the queue or maxPageSize is reached + var numberOfItems = 0.uint + while currentEntry.isOk and numberOfItems < maxPageSize: + if pred(currentEntry.value.data): + lastValidCursor = currentEntry.value.key + outSeq.add(currentEntry.value.data.msg) + numberOfItems += 1 + currentEntry = w.next + w.destroy + + outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, + cursor: lastValidCursor, + direction: PagingDirection.FORWARD) + + outError = HistoryResponseError.NONE + + return (outSeq, outPagingInfo, outError) + +proc bwdPage(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + maxPageSize: uint64, + startCursor: Option[Index]): + (seq[WakuMessage], PagingInfo, HistoryResponseError) = + ## Populate a single page in backward direction + ## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined. + ## Page size must not exceed `maxPageSize` + ## Each entry must match the `pred` + + var + outSeq: seq[WakuMessage] + outPagingInfo: PagingInfo + outError: HistoryResponseError + + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + currentEntry: SortedSetResult[Index, IndexedWakuMessage] + lastValidCursor: Index + + # Find starting entry + if startCursor.isSome(): + lastValidCursor = startCursor.get() + + let cursorEntry = w.rwdToCursor(startCursor.get()) + if cursorEntry.isErr: + # Quick exit here if start cursor not found + outSeq = @[] + outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD) + outError = HistoryResponseError.INVALID_CURSOR + return (outSeq, outPagingInfo, outError) + + # Step walker one more step back + currentEntry = w.prev + else: + # Start from the back of the queue + lastValidCursor = Index() # No valid (only empty) last cursor + currentEntry = w.last + + ## This loop walks backward over the queue: + ## 1. from the given cursor (or last entry, if not provided) + ## 2. adds entries matching the predicate function to output page + ## 3. until either the beginning of the queue or maxPageSize is reached + var numberOfItems = 0.uint + while currentEntry.isOk and numberOfItems < maxPageSize: + if pred(currentEntry.value.data): + lastValidCursor = currentEntry.value.key + outSeq.add(currentEntry.value.data.msg) + numberOfItems += 1 + currentEntry = w.prev + w.destroy + + outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, + cursor: lastValidCursor, + direction: PagingDirection.BACKWARD) + outError = HistoryResponseError.NONE + + return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order + outPagingInfo, + outError) + +################## +# StoreQueue API # +################## + +## --- SortedSet accessors --- + +iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = + ## Forward iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.first + while res.isOk: + yield (res.value.key, res.value.data) + res = w.next + w.destroy + +iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = + ## Backwards iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.last + while res.isOk: + yield (res.value.key, res.value.data) + res = w.prev + w.destroy + +proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.first + w.destroy + + if res.isOk: + return ok(res.value.data) + else: + return err("Not found") + +proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.last + w.destroy + + if res.isOk: + return ok(res.value.data) + else: + return err("Not found") + +## --- Queue API --- + +proc new*(T: type StoreQueueRef, capacity: int): T = + var items = SortedSet[Index, IndexedWakuMessage].init() + + return StoreQueueRef(items: items, capacity: capacity) + +proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] = ## Add a message to the queue. ## If we're at capacity, we will be removing, - ## the oldest item - - if storeQueue.items.len >= storeQueue.capacity: - storeQueue.items.delete 0, 0 # Remove first item in queue + ## the oldest (first) item - storeQueue.items.add(msg) + # TODO the below delete block can be removed if we convert to circular buffer + if storeQueue.items.len >= storeQueue.capacity: + var + w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items) + toDelete = w.first + discard storeQueue.items.delete(toDelete.value.key) + w.destroy # better to destroy walker after a delete operation + + let res = storeQueue.items.insert(msg.index) + if res.isErr: + # This indicates the index already exists in the storeQueue. + # TODO: could return error result and log in metrics + return err("duplicate") + else: + res.value.data = msg + + return ok() -proc len*(storeQueue: StoreQueue): int {.noSideEffect.} = +proc getPage*(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + pagingInfo: PagingInfo): + (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = + ## Get a single page of history matching the predicate and + ## adhering to the pagingInfo parameters + + let + cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! + else: some(pagingInfo.cursor) + maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos + else: pagingInfo.pageSize + + case pagingInfo.direction + of FORWARD: + return storeQueue.fwdPage(pred, maxPageSize, cursorOpt) + of BACKWARD: + return storeQueue.bwdPage(pred, maxPageSize, cursorOpt) + +proc getPage*(storeQueue: StoreQueueRef, + pagingInfo: PagingInfo): + (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = + ## Get a single page of history without filtering. + ## Adhere to the pagingInfo parameters + + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + return getPage(storeQueue, predicate, pagingInfo) + +proc contains*(storeQueue: StoreQueueRef, index: Index): bool = + ## Return `true` if the store queue already contains the `index`, + ## `false` otherwise + let res = storeQueue.items.eq(index) + + return res.isOk() + +proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} = storeQueue.items.len -proc allItems*(storeQueue: StoreQueue): seq[IndexedWakuMessage] = - storeQueue.items - -template filterIt*(storeQueue: StoreQueue, pred: untyped): untyped = - storeQueue.items.filterIt(pred) - -template mapIt*(storeQueue: StoreQueue, op: untyped): untyped = - storeQueue.items.mapIt(op) +proc `$`*(storeQueue: StoreQueueRef): string = + $(storeQueue.items) \ No newline at end of file diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index d2c617e7a..dcd86e1b8 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -4,7 +4,11 @@ {.push raises: [Defect].} -import nimcrypto/hash +import + nimcrypto/hash, + stew/byteutils + +export hash type Index* = object @@ -12,3 +16,23 @@ type digest*: MDigest[256] receiverTime*: float64 senderTime*: float64 # the time at which the message is generated + +proc `==`*(x, y: Index): bool = + ## receiverTime plays no role in index comparison + (x.senderTime == y.senderTime) and (x.digest == y.digest) + +proc cmp*(x, y: Index): int = + ## compares x and y + ## returns 0 if they are equal + ## returns -1 if x < y + ## returns 1 if x > y + ## receiverTime plays no role in index comparison + + # Timestamp has a higher priority for comparison + let timecmp = cmp(x.senderTime, y.senderTime) + if timecmp != 0: + return timecmp + + # Only when timestamps are equal + let digestcm = cmp(x.digest.data, y.digest.data) + return digestcm