diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 48db4423d..9cc7a522e 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -1,11 +1,21 @@ {.used.} import - std/unittest, + std/[unittest,algorithm,options], + nimcrypto/sha2, ../../waku/node/v2/waku_types, + ../../waku/protocol/v2/waku_store, ../test_helpers + +proc createSampleList(s: int): seq[IndexedWakuMessage] = + ## takes s as input and outputs a sequence with s amount of IndexedWakuMessage + var data {.noinit.}: array[32, byte] + for x in data.mitems: x = 1 + for i in 0.. y + let + timecmp = system.cmp(x.receivedTime, y.receivedTime) + digestcm = system.cmp(x.digest.data, y.digest.data) + if timecmp != 0: # timestamp has a higher priority for comparison + return timecmp + return digestcm + +proc indexedWakuMessageComparison*(x, y: IndexedWakuMessage): int = + ## compares x and y + ## returns 0 if they are equal + ## returns -1 if x < y + ## returns 1 if x > y + result = indexComparison(x.index, y.index) + +proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] = + ## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index + ## returns none if no match is found + for i, indexedWakuMessage in msgList: + if indexedWakuMessage.index == index: + return some(i) + return none(int) + +proc paginateWithIndex*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo) = + ## takes list, and performs paging based on pinfo + ## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request + var + cursor = pinfo.cursor + pageSize = pinfo.pageSize + dir = pinfo.direction + + if pageSize == 0: # pageSize being zero indicates that no pagination is required + return (list, pinfo) + + if list.len == 0: # no pagination is needed for an empty list + return (list, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction)) + + var msgList = list # makes a copy of the list + # sorts msgList based on the custom comparison proc indexedWakuMessageComparison + msgList.sort(indexedWakuMessageComparison) + + var initQuery = false + if cursor == Index(): + initQuery = true # an empty cursor means it is an intial query + case dir + of PagingDirection.FORWARD: + cursor = list[0].index # perform paging from the begining of the list + of PagingDirection.BACKWARD: + cursor = list[list.len - 1].index # perform paging from the end of the list + var foundIndexOption = msgList.findIndex(cursor) + if foundIndexOption.isNone: # the cursor is not valid + return (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction)) + var foundIndex = foundIndexOption.get() + var retrievedPageSize, s, e: int + var newCursor: Index # to be returned as part of the new paging info + case dir + of PagingDirection.FORWARD: # forward pagination + let remainingMessages= msgList.len - foundIndex - 1 + # the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex + retrievedPageSize = min(int(pageSize), MaxPageSize).min(remainingMessages) + if initQuery : foundIndex = foundIndex - 1 + s = foundIndex + 1 # non inclusive + e = foundIndex + retrievedPageSize + newCursor = msgList[e].index # the new cursor points to the end of the page + of PagingDirection.BACKWARD: # backward pagination + let remainingMessages=foundIndex + # the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0 + retrievedPageSize = min(int(pageSize), MaxPageSize).min(remainingMessages) + if initQuery : foundIndex = foundIndex + 1 + s = foundIndex - retrievedPageSize + e = foundIndex - 1 + newCursor = msgList[s].index # the new cursor points to the begining of the page + + # retrieve the messages + for i in s..e: + result[0].add(msgList[i]) + + result[1] = PagingInfo(pageSize : uint64(retrievedPageSize), cursor : newCursor, direction : pinfo.direction) + + +proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[WakuMessage], PagingInfo) = + ## takes list, and perfomrs paging based on pinfo + ## returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request + var (indexedData, updatedPagingInfo) = paginateWithIndex(list,pinfo) + for indexedMsg in indexedData: + result[0].add(indexedMsg.msg) + result[1] = updatedPagingInfo + proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) - for msg in w.messages: - if msg.contentTopic in query.topics: - result.messages.insert(msg) + # data holds IndexedWakuMessage whose topics match the query + var data = w.messages.filterIt(it.msg.contentTopic in query.topics) + + # perform pagination + (result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo) + method init*(ws: WakuStore) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = @@ -191,7 +286,8 @@ method init*(ws: WakuStore) = let value = res.value let response = ws.findMessages(res.value.query) - await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer) + await conn.writeLp(HistoryRPC(requestId: value.requestId, + response: response).encode().buffer) ws.handler = handle ws.codec = WakuStoreCodec @@ -212,7 +308,9 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## the filter should be used by the component that receives ## new messages. proc handle(topic: string, msg: WakuMessage) {.async.} = - proto.messages.add(msg) + let index = msg.computeIndex() + proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) + MessageNotificationSubscription.init(@[], handle) @@ -227,7 +325,8 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn let peer = w.peers[0] let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) - await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) + await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + query: query).encode().buffer) var message = await conn.readLp(64*1024) let response = HistoryRPC.init(message)