From ea5f9993a7023b186adf3c746b166287084c3306 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Sun, 8 Nov 2020 20:48:09 -0800 Subject: [PATCH] Pagination feature/indexing waku messages (#233) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * changes the digest type to MDigest[256] and modifies the computeIndex * fixes formatting issue * adds the pagination with its tests stores and retrieves IndexedWakuMessage adds the paginate proc adds the paginate function fixes some formatting issues minor edits indentation and fixes a bug removes unused imports minor fixes indentations and adds a new testcase adds indexedWakuMessageComparison adds `==` proc for IndexedWakuMessage separates the comparison of index and indexed waku messages adds testcases for the Index comparison and IndexedWakuMessage comparison WIP WIP: adds an decoder for Index removes an unnecessary imports WIP adds findIndex() proc removes the equality check '==' for IndexedWakuMessages edits the code format and adds the pagination test edits paginate() proc to work on a copy of the input list deletes unnecessary echo adds the boundary calculations for forward and backward pagination adds test cases for the page boundaries tests corner cases for the queried cursor and pagesize minor adds some comments adds a proc to extract WakuMessages from a list of IndexedWakuMessages integrates pagination into the findMessages proc adds some comments changes paginate to paginateWithIndex removes some echos modifies paginateWithIndex to handle invalid cursors adds test case for an invalid cursor WIP: adds a `$` proc for IndexedWakuMessages adds some debugging message prints adds an integration test for handling query with pagination * fixes a type mismatch issue in the min proc * replaces boolean direction with their enums and updates contentTopics * adds the unit test for the sorting of the indexed waku messages * fixes a flaky test * fixes a flaky test * removes index equality check proc * handles an initial query with an empty cursor * adds test for the initial query * adds integration test for pagination * adds a test for empty message list * adds comments and fixes an issue * adds comments * code cleanup * removes the content topic validation check * resolves the errors related to the windows CI tests * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén * Update tests/v2/test_waku_pagination.nim Co-authored-by: Oskar Thorén * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén * changes the output type of findIndex to Option * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * adds some comments * fixes an indentation issue * some code modification for array initialization * Apply suggestions from code review Co-authored-by: Oskar Thorén Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * does some code reorganizations and clean up * CreateSampleList to createSampleList * replaces a byte array literal initialization with a for loop * relocates indexedWakuMessageComparison and indexComparison * minor Co-authored-by: Oskar Thorén Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> --- tests/v2/test_waku_pagination.nim | 199 +++++++++++++++++++++++++++++- tests/v2/test_waku_store.nim | 152 +++++++++++++++++++++++ waku/node/v2/waku_types.nim | 22 ++-- waku/protocol/v2/waku_store.nim | 115 +++++++++++++++-- 4 files changed, 463 insertions(+), 25 deletions(-) diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 48db4423d..9cc7a522e 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -1,11 +1,21 @@ {.used.} import - std/unittest, + std/[unittest,algorithm,options], + nimcrypto/sha2, ../../waku/node/v2/waku_types, + ../../waku/protocol/v2/waku_store, ../test_helpers + +proc createSampleList(s: int): seq[IndexedWakuMessage] = + ## takes s as input and outputs a sequence with s amount of IndexedWakuMessage + var data {.noinit.}: array[32, byte] + for x in data.mitems: x = 1 + for i in 0.. y + let + timecmp = system.cmp(x.receivedTime, y.receivedTime) + digestcm = system.cmp(x.digest.data, y.digest.data) + if timecmp != 0: # timestamp has a higher priority for comparison + return timecmp + return digestcm + +proc indexedWakuMessageComparison*(x, y: IndexedWakuMessage): int = + ## compares x and y + ## returns 0 if they are equal + ## returns -1 if x < y + ## returns 1 if x > y + result = indexComparison(x.index, y.index) + +proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] = + ## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index + ## returns none if no match is found + for i, indexedWakuMessage in msgList: + if indexedWakuMessage.index == index: + return some(i) + return none(int) + +proc paginateWithIndex*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo) = + ## takes list, and performs paging based on pinfo + ## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request + var + cursor = pinfo.cursor + pageSize = pinfo.pageSize + dir = pinfo.direction + + if pageSize == 0: # pageSize being zero indicates that no pagination is required + return (list, pinfo) + + if list.len == 0: # no pagination is needed for an empty list + return (list, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction)) + + var msgList = list # makes a copy of the list + # sorts msgList based on the custom comparison proc indexedWakuMessageComparison + msgList.sort(indexedWakuMessageComparison) + + var initQuery = false + if cursor == Index(): + initQuery = true # an empty cursor means it is an intial query + case dir + of PagingDirection.FORWARD: + cursor = list[0].index # perform paging from the begining of the list + of PagingDirection.BACKWARD: + cursor = list[list.len - 1].index # perform paging from the end of the list + var foundIndexOption = msgList.findIndex(cursor) + if foundIndexOption.isNone: # the cursor is not valid + return (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction)) + var foundIndex = foundIndexOption.get() + var retrievedPageSize, s, e: int + var newCursor: Index # to be returned as part of the new paging info + case dir + of PagingDirection.FORWARD: # forward pagination + let remainingMessages= msgList.len - foundIndex - 1 + # the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex + retrievedPageSize = min(int(pageSize), MaxPageSize).min(remainingMessages) + if initQuery : foundIndex = foundIndex - 1 + s = foundIndex + 1 # non inclusive + e = foundIndex + retrievedPageSize + newCursor = msgList[e].index # the new cursor points to the end of the page + of PagingDirection.BACKWARD: # backward pagination + let remainingMessages=foundIndex + # the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0 + retrievedPageSize = min(int(pageSize), MaxPageSize).min(remainingMessages) + if initQuery : foundIndex = foundIndex + 1 + s = foundIndex - retrievedPageSize + e = foundIndex - 1 + newCursor = msgList[s].index # the new cursor points to the begining of the page + + # retrieve the messages + for i in s..e: + result[0].add(msgList[i]) + + result[1] = PagingInfo(pageSize : uint64(retrievedPageSize), cursor : newCursor, direction : pinfo.direction) + + +proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[WakuMessage], PagingInfo) = + ## takes list, and perfomrs paging based on pinfo + ## returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request + var (indexedData, updatedPagingInfo) = paginateWithIndex(list,pinfo) + for indexedMsg in indexedData: + result[0].add(indexedMsg.msg) + result[1] = updatedPagingInfo + proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) - for msg in w.messages: - if msg.contentTopic in query.topics: - result.messages.insert(msg) + # data holds IndexedWakuMessage whose topics match the query + var data = w.messages.filterIt(it.msg.contentTopic in query.topics) + + # perform pagination + (result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo) + method init*(ws: WakuStore) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = @@ -191,7 +286,8 @@ method init*(ws: WakuStore) = let value = res.value let response = ws.findMessages(res.value.query) - await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer) + await conn.writeLp(HistoryRPC(requestId: value.requestId, + response: response).encode().buffer) ws.handler = handle ws.codec = WakuStoreCodec @@ -212,7 +308,9 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## the filter should be used by the component that receives ## new messages. proc handle(topic: string, msg: WakuMessage) {.async.} = - proto.messages.add(msg) + let index = msg.computeIndex() + proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) + MessageNotificationSubscription.init(@[], handle) @@ -227,7 +325,8 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn let peer = w.peers[0] let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) - await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) + await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + query: query).encode().buffer) var message = await conn.readLp(64*1024) let response = HistoryRPC.init(message)