diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index c0d40d624..4b1f813ef 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -6,9 +6,9 @@ import ./v2/test_wakunode_relay, ./v2/test_wakunode_lightpush, # Waku Store - ./v2/test_utils_pagination, - ./v2/test_message_store_queue, + ./v2/test_message_store_queue_index, ./v2/test_message_store_queue_pagination, + ./v2/test_message_store_queue, ./v2/test_message_store_sqlite_query, ./v2/test_message_store_sqlite, ./v2/test_waku_store_rpc_codec, diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index 89832f4c0..5cc851506 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -399,7 +399,7 @@ procSuite "Sorted store queue": proc predicate(i: IndexedWakuMessage): bool = true # no filtering - let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) + let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD) ## When @@ -423,7 +423,7 @@ procSuite "Sorted store queue": proc predicate(i: IndexedWakuMessage): bool = true # no filtering - let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) + let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()) let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD) ## When diff --git a/tests/v2/test_utils_pagination.nim b/tests/v2/test_message_store_queue_index.nim similarity index 99% rename from tests/v2/test_utils_pagination.nim rename to tests/v2/test_message_store_queue_index.nim index 1e24e3f99..c5568d1cf 100644 --- a/tests/v2/test_utils_pagination.nim +++ b/tests/v2/test_message_store_queue_index.nim @@ -8,7 +8,7 @@ import import ../../waku/v2/protocol/waku_message, ../../waku/v2/utils/time, - ../../waku/v2/utils/pagination + ../../waku/v2/node/storage/message/queue_store/index const diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index 5ace2ca3f..5a56fd126 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -43,40 +43,41 @@ proc getTestTimestamp(): Timestamp = suite "Queue store - pagination": test "Forward pagination test": - var - stQ = getTestStoreQueue(10) - indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification - msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification - pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.FORWARD) + let + store = getTestStoreQueue(10) + indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification + msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification + + var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) # test for a normal pagination - var (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + var (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 2 data == msgList[4..5] - newPagingInfo.cursor == indexList[5] + newPagingInfo.cursor == indexList[5].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == pagingInfo.pageSize error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 2 data == msgList[0..1] - newPagingInfo.cursor == indexList[1] + newPagingInfo.cursor == indexList[1].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 2 error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 10 data == msgList[0..9] - newPagingInfo.cursor == indexList[9] + newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 10 error == HistoryResponseError.NONE @@ -92,19 +93,19 @@ suite "Queue store - pagination": error == HistoryResponseError.NONE # test for a page size larger than the remaining messages - pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3], direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 6 data == msgList[4..9] - newPagingInfo.cursor == indexList[9] + newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 6 error == HistoryResponseError.NONE # test for a page size larger than the maximum allowed page size - pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: uint64(data.len) <= MaxPageSize newPagingInfo.direction == pagingInfo.direction @@ -112,19 +113,19 @@ suite "Queue store - pagination": error == HistoryResponseError.NONE # test for a cursor pointing to the end of the message list - pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9], direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 0 - newPagingInfo.cursor == indexList[9] + newPagingInfo.cursor == indexList[9].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 error == HistoryResponseError.NONE # test for an invalid cursor - let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) + let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 0 newPagingInfo.cursor == pagingInfo.cursor @@ -138,34 +139,35 @@ suite "Queue store - pagination": (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) check: data.len == 1 - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 1 error == HistoryResponseError.NONE # test pagination over a message list with one message singleItemMsgList = getTestStoreQueue(1) - pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.FORWARD) + pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD) (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) check: data.len == 0 - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 error == HistoryResponseError.NONE test "Backward pagination test": - var - stQ = getTestStoreQueue(10) - indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification - msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification - pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.BACKWARD) + let + store = getTestStoreQueue(10) + indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification + msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification + + var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) # test for a normal pagination - var (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + var (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data == msgList[1..2] - newPagingInfo.cursor == indexList[1] + newPagingInfo.cursor == indexList[1].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == pagingInfo.pageSize error == HistoryResponseError.NONE @@ -182,39 +184,39 @@ suite "Queue store - pagination": # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 2 data == msgList[8..9] - newPagingInfo.cursor == indexList[8] + newPagingInfo.cursor == indexList[8].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 2 error == HistoryResponseError.NONE # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 10 data == msgList[0..9] - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 10 error == HistoryResponseError.NONE # test for a page size larger than the remaining messages - pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3], direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data == msgList[0..2] - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 3 error == HistoryResponseError.NONE # test for a page size larger than the Maximum allowed page size - pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: uint64(data.len) <= MaxPageSize newPagingInfo.direction == pagingInfo.direction @@ -222,20 +224,20 @@ suite "Queue store - pagination": error == HistoryResponseError.NONE # test for a cursor pointing to the begining of the message list - pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0], direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 0 - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 error == HistoryResponseError.NONE # test for an invalid cursor - let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) + let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD) - (data, newPagingInfo, error) = getPage(stQ, pagingInfo) + (data, newPagingInfo, error) = getPage(store, pagingInfo) check: data.len == 0 newPagingInfo.cursor == pagingInfo.cursor @@ -249,18 +251,18 @@ suite "Queue store - pagination": (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) check: data.len == 1 - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 1 error == HistoryResponseError.NONE # test paging query over a message list with one message singleItemMsgList = getTestStoreQueue(1) - pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.BACKWARD) + pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) (data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo) check: data.len == 0 - newPagingInfo.cursor == indexList[0] + newPagingInfo.cursor == indexList[0].toPagingIndex() newPagingInfo.direction == pagingInfo.direction newPagingInfo.pageSize == 0 error == HistoryResponseError.NONE diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index 4fc5039c8..0a3c4f7ed 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -161,11 +161,11 @@ suite "Message Store": WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3), ] - var indexes: seq[Index] = @[] + var indexes: seq[PagingIndex] = @[] for msg in msgs: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let index = PagingIndex.compute(msg, msg.timestamp, DefaultPubsubTopic) indexes.add(index) ## When diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 8d1f6008c..d73319b57 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -267,7 +267,7 @@ suite "message store - history query": for msg in messages: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) + let cursor = PagingIndex.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) ## When let res = store.getMessagesByHistoryQuery( @@ -318,7 +318,7 @@ suite "message store - history query": for msg in messages: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) + let cursor = PagingIndex.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) ## When let res = store.getMessagesByHistoryQuery( @@ -372,7 +372,7 @@ suite "message store - history query": for msg in messages2: require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) + let cursor = PagingIndex.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) ## When let res = store.getMessagesByHistoryQuery( @@ -658,7 +658,7 @@ suite "message store - history query": let digest = computeDigest(msg) require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() - let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) + let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) ## When let res = store.getMessagesByHistoryQuery( diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index d3813f6de..5563fd320 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -345,7 +345,7 @@ suite "Waku Store - history query": response.messages.len() == 2 response.pagingInfo.pageSize == 2 response.pagingInfo.direction == PagingDirection.FORWARD - response.pagingInfo.cursor != Index() + response.pagingInfo.cursor != PagingIndex() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) @@ -397,7 +397,7 @@ suite "Waku Store - history query": response.messages.len() == 2 response.pagingInfo.pageSize == 2 response.pagingInfo.direction == PagingDirection.BACKWARD - response.pagingInfo.cursor != Index() + response.pagingInfo.cursor != PagingIndex() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) @@ -448,7 +448,7 @@ suite "Waku Store - history query": response.messages.len() == 8 response.pagingInfo.pageSize == 8 response.pagingInfo.direction == PagingDirection.BACKWARD - response.pagingInfo.cursor != Index() + response.pagingInfo.cursor != PagingIndex() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/tests/v2/test_waku_store_rpc_codec.nim b/tests/v2/test_waku_store_rpc_codec.nim index 982a07e32..2b52415fe 100644 --- a/tests/v2/test_waku_store_rpc_codec.nim +++ b/tests/v2/test_waku_store_rpc_codec.nim @@ -32,13 +32,13 @@ proc fakeWakuMessage( procSuite "Waku Store - RPC codec": - test "Index protobuf codec": + test "PagingIndex protobuf codec": ## Given - let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) + let index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) ## When let encodedIndex = index.encode() - let decodedIndexRes = Index.init(encodedIndex.buffer) + let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) ## Then check: @@ -49,12 +49,12 @@ procSuite "Waku Store - RPC codec": # The fields of decodedIndex must be the same as the original index decodedIndex == index - test "Index protobuf codec - empty index": + test "PagingIndex protobuf codec - empty index": ## Given - let emptyIndex = Index() + let emptyIndex = PagingIndex() let encodedIndex = emptyIndex.encode() - let decodedIndexRes = Index.init(encodedIndex.buffer) + let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) ## Then check: @@ -62,13 +62,13 @@ procSuite "Waku Store - RPC codec": let decodedIndex = decodedIndexRes.tryGet() check: - # Check the correctness of init and encode for an empty Index + # Check the correctness of init and encode for an empty PagingIndex decodedIndex == emptyIndex test "PagingInfo protobuf codec": ## Given let - index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) + index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD) ## When @@ -103,7 +103,7 @@ procSuite "Waku Store - RPC codec": test "HistoryQuery protobuf codec": ## Given let - index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) + index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) @@ -139,7 +139,7 @@ procSuite "Waku Store - RPC codec": ## Given let message = fakeWakuMessage() - index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) + index = PagingIndex.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR) diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index 077991021..0f96f724e 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -15,7 +15,7 @@ type StorePagingOptions* = object ## This type holds some options for pagination pageSize*: uint64 - cursor*: Option[Index] + cursor*: Option[PagingIndex] forward*: bool WakuRelayMessage* = object diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index 1f09b402a..9c1b968a0 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -29,7 +29,7 @@ proc `%`*(value: WakuMessage): JsonNode = proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo = PagingInfo(pageSize: pagingOptions.pageSize, - cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: Index(), + cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndex(), direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD) proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions = diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim index 34d1a2e10..afab59d48 100644 --- a/waku/v2/node/storage/message/dual_message_store.nim +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -60,7 +60,7 @@ method getMessagesByHistoryQuery*( s: DualMessageStore, contentTopic = none(seq[ContentTopic]), pubsubTopic = none(string), - cursor = none(Index), + cursor = none(PagingIndex), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = MaxPageSize, diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 1414857eb..50b284f0b 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -39,7 +39,7 @@ method getMessagesByHistoryQuery*( ms: MessageStore, contentTopic = none(seq[ContentTopic]), pubsubTopic = none(string), - cursor = none(Index), + cursor = none(PagingIndex), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = DefaultPageSize, diff --git a/waku/v2/node/storage/message/queue_store/index.nim b/waku/v2/node/storage/message/queue_store/index.nim new file mode 100644 index 000000000..7da825f8e --- /dev/null +++ b/waku/v2/node/storage/message/queue_store/index.nim @@ -0,0 +1,89 @@ +{.push raises: [Defect].} + +import + stew/byteutils, + nimcrypto/sha2 +import + ../../../../protocol/waku_message, + ../../../../utils/time, + ../../../../utils/pagination + + +type Index* = object + ## This type contains the description of an Index used in the pagination of WakuMessages + pubsubTopic*: string + senderTime*: Timestamp # the time at which the message is generated + receiverTime*: Timestamp + digest*: MessageDigest # calculated over payload and content topic + +proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T = + ## Takes a WakuMessage with received timestamp and returns its Index. + let + digest = computeDigest(msg) + senderTime = msg.timestamp + + Index( + pubsubTopic: pubsubTopic, + senderTime: senderTime, + receiverTime: receivedTime, + digest: digest + ) + + +proc toPagingIndex*(index: Index): PagingIndex = + PagingIndex( + pubsubTopic: index.pubsubTopic, + senderTime: index.senderTime, + receiverTime: index.receiverTime, + digest: index.digest + ) + +proc toIndex*(index: PagingIndex): Index = + Index( + pubsubTopic: index.pubsubTopic, + senderTime: index.senderTime, + receiverTime: index.receiverTime, + digest: index.digest + ) + + +proc `==`*(x, y: Index): bool = + ## receiverTime plays no role in index equality + (x.senderTime == y.senderTime) and + (x.digest == y.digest) and + (x.pubsubTopic == y.pubsubTopic) + +proc cmp*(x, y: Index): int = + ## compares x and y + ## returns 0 if they are equal + ## returns -1 if x < y + ## returns 1 if x > y + ## + ## Default sorting order priority is: + ## 1. senderTimestamp + ## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal) + ## 3. message digest + ## 4. pubsubTopic + + if x == y: + # Quick exit ensures receiver time does not affect index equality + return 0 + + # Timestamp has a higher priority for comparison + let + # Use receiverTime where senderTime is unset + xTimestamp = if x.senderTime == 0: x.receiverTime + else: x.senderTime + yTimestamp = if y.senderTime == 0: y.receiverTime + else: y.senderTime + + let timecmp = cmp(xTimestamp, yTimestamp) + if timecmp != 0: + return timecmp + + # Continue only when timestamps are equal + let digestcmp = cmp(x.digest.data, y.digest.data) + if digestcmp != 0: + return digestcmp + + return cmp(x.pubsubTopic, y.pubsubTopic) diff --git a/waku/v2/node/storage/message/queue_store/queue_store.nim b/waku/v2/node/storage/message/queue_store/queue_store.nim new file mode 100644 index 000000000..90c9d5801 --- /dev/null +++ b/waku/v2/node/storage/message/queue_store/queue_store.nim @@ -0,0 +1,448 @@ +{.push raises: [Defect].} + +import + std/[options, algorithm, times], + stew/[results, sorted_set], + chronicles +import + ../../../../protocol/waku_message, + ../../../../protocol/waku_store/rpc, + ../../../../utils/pagination, + ../../../../utils/time, + ../message_store, + ./index + + +logScope: + topics = "message_store.storequeue" + + +const StoreQueueDefaultMaxCapacity* = 25_000 + + +type + IndexedWakuMessage* = object + # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message + ## This type is used to encapsulate a WakuMessage and its Index + msg*: WakuMessage + index*: Index + pubsubTopic*: string + + QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} + + +type + StoreQueueRef* = ref object of MessageStore + ## Bounded repository for indexed messages + ## + ## The store queue will keep messages up to its + ## configured capacity. As soon as this capacity + ## is reached and a new message is added, the oldest + ## item will be removed to make space for the new one. + ## This implies both a `delete` and `add` operation + ## for new items. + ## + ## @ TODO: a circular/ring buffer may be a more efficient implementation + ## @ TODO: we don't need to store the Index twice (as key and in the value) + items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages + capacity: int # Maximum amount of messages to keep + + +### Helpers + +proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], + startCursor: Index): + SortedSetResult[Index, IndexedWakuMessage] = + ## Fast forward `w` to start cursor + ## TODO: can probably improve performance here with a binary/tree search + + var nextItem = w.first + + trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem + + ## Fast forward until we reach the startCursor + while nextItem.isOk: + if nextItem.value.key == startCursor: + # Exit ffd loop when we find the start cursor + break + + # Not yet at cursor. Continue advancing + nextItem = w.next + trace "Continuing ffd to start cursor", nextItem=nextItem + + return nextItem + +proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], + startCursor: Index): + SortedSetResult[Index, IndexedWakuMessage] = + ## Rewind `w` to start cursor + ## TODO: can probably improve performance here with a binary/tree search + + var prevItem = w.last + + trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem + + ## Rewind until we reach the startCursor + + while prevItem.isOk: + if prevItem.value.key == startCursor: + # Exit rwd loop when we find the start cursor + break + + # Not yet at cursor. Continue rewinding. + prevItem = w.prev + trace "Continuing rewind to start cursor", prevItem=prevItem + + return prevItem + +proc fwdPage(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + maxPageSize: uint64, + startCursor: Option[Index]): + (seq[WakuMessage], PagingInfo, HistoryResponseError) = + ## Populate a single page in forward direction + ## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined. + ## Page size must not exceed `maxPageSize` + ## Each entry must match the `pred` + + trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor + + var + outSeq: seq[WakuMessage] + outPagingInfo: PagingInfo + outError: HistoryResponseError + + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + currentEntry: SortedSetResult[Index, IndexedWakuMessage] + lastValidCursor: Index + + # Find first entry + if startCursor.isSome(): + lastValidCursor = startCursor.get() + + let cursorEntry = w.ffdToCursor(startCursor.get()) + if cursorEntry.isErr: + # Quick exit here if start cursor not found + trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor + outSeq = @[] + outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.FORWARD) + outError = HistoryResponseError.INVALID_CURSOR + w.destroy + return (outSeq, outPagingInfo, outError) + + # Advance walker once more + currentEntry = w.next + else: + # Start from the beginning of the queue + lastValidCursor = Index() # No valid (only empty) last cursor + currentEntry = w.first + + trace "Starting fwd page query", currentEntry=currentEntry + + ## This loop walks forward over the queue: + ## 1. from the given cursor (or first entry, if not provided) + ## 2. adds entries matching the predicate function to output page + ## 3. until either the end of the queue or maxPageSize is reached + var numberOfItems = 0.uint + while currentEntry.isOk and numberOfItems < maxPageSize: + + trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems + + if pred(currentEntry.value.data): + trace "Current item matches predicate. Adding to output." + lastValidCursor = currentEntry.value.key + outSeq.add(currentEntry.value.data.msg) + numberOfItems += 1 + currentEntry = w.next + w.destroy + + outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, + cursor: lastValidCursor.toPagingIndex(), + direction: PagingDirection.FORWARD) + + outError = HistoryResponseError.NONE + + trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo + + return (outSeq, outPagingInfo, outError) + +proc bwdPage(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + maxPageSize: uint64, + startCursor: Option[Index]): + (seq[WakuMessage], PagingInfo, HistoryResponseError) = + ## Populate a single page in backward direction + ## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined. + ## Page size must not exceed `maxPageSize` + ## Each entry must match the `pred` + + trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor + + var + outSeq: seq[WakuMessage] + outPagingInfo: PagingInfo + outError: HistoryResponseError + + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + currentEntry: SortedSetResult[Index, IndexedWakuMessage] + lastValidCursor: Index + + # Find starting entry + if startCursor.isSome(): + lastValidCursor = startCursor.get() + + let cursorEntry = w.rwdToCursor(startCursor.get()) + if cursorEntry.isErr(): + # Quick exit here if start cursor not found + trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor + outSeq = @[] + outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.BACKWARD) + outError = HistoryResponseError.INVALID_CURSOR + w.destroy + return (outSeq, outPagingInfo, outError) + + # Step walker one more step back + currentEntry = w.prev + else: + # Start from the back of the queue + lastValidCursor = Index() # No valid (only empty) last cursor + currentEntry = w.last + + trace "Starting bwd page query", currentEntry=currentEntry + + ## This loop walks backward over the queue: + ## 1. from the given cursor (or last entry, if not provided) + ## 2. adds entries matching the predicate function to output page + ## 3. until either the beginning of the queue or maxPageSize is reached + var numberOfItems = 0.uint + while currentEntry.isOk() and numberOfItems < maxPageSize: + trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems + + if pred(currentEntry.value.data): + trace "Current item matches predicate. Adding to output." + lastValidCursor = currentEntry.value.key + outSeq.add(currentEntry.value.data.msg) + numberOfItems += 1 + currentEntry = w.prev + w.destroy + + outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, + cursor: lastValidCursor.toPagingIndex(), + direction: PagingDirection.BACKWARD) + outError = HistoryResponseError.NONE + + trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo + + return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order + outPagingInfo, + outError) + + +#### API + +proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T = + var items = SortedSet[Index, IndexedWakuMessage].init() + return StoreQueueRef(items: items, capacity: capacity) + + +proc contains*(store: StoreQueueRef, index: Index): bool = + ## Return `true` if the store queue already contains the `index`, `false` otherwise. + store.items.eq(index).isOk() + +proc len*(store: StoreQueueRef): int {.noSideEffect.} = + store.items.len + + +## --- SortedSet accessors --- + +iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = + ## Forward iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.first() + + while res.isOk(): + yield (res.value.key, res.value.data) + res = w.next() + + w.destroy() + +iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = + ## Backwards iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.last() + + while res.isOk(): + yield (res.value.key, res.value.data) + res = w.prev() + + w.destroy() + +proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items) + res = w.first() + w.destroy() + + if res.isErr(): + return err("Not found") + + return ok(res.value.data) + +proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) + res = w.last() + w.destroy() + + if res.isErr(): + return err("Not found") + + return ok(res.value.data) + + +## --- Queue API --- + +proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] = + ## Add a message to the queue + ## + ## If we're at capacity, we will be removing, the oldest (first) item + if store.contains(msg.index): + trace "could not add item to store queue. Index already exists", index=msg.index + return err("duplicate") + + # TODO: the below delete block can be removed if we convert to circular buffer + if store.items.len >= store.capacity: + var + w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items) + firstItem = w.first + + if cmp(msg.index, firstItem.value.key) < 0: + # When at capacity, we won't add if message index is smaller (older) than our oldest item + w.destroy # Clean up walker + return err("too_old") + + discard store.items.delete(firstItem.value.key) + w.destroy # better to destroy walker after a delete operation + + store.items.insert(msg.index).value.data = msg + + return ok() + + +method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] = + let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) + let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) + store.add(message) + +method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = + let + now = getNanosecondTime(getTime().toUnixFloat()) + digest = computeDigest(message) + store.put(pubsubTopic, message, digest, now) + + +proc getPage*(storeQueue: StoreQueueRef, + pred: QueryFilterMatcher, + pagingInfo: PagingInfo): + (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = + ## Get a single page of history matching the predicate and + ## adhering to the pagingInfo parameters + + trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo + + let + cursorOpt = if pagingInfo.cursor == PagingIndex(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! + else: some(pagingInfo.cursor.toIndex()) + maxPageSize = pagingInfo.pageSize + + case pagingInfo.direction + of PagingDirection.FORWARD: + return storeQueue.fwdPage(pred, maxPageSize, cursorOpt) + of PagingDirection.BACKWARD: + return storeQueue.bwdPage(pred, maxPageSize, cursorOpt) + +proc getPage*(storeQueue: StoreQueueRef, + pagingInfo: PagingInfo): + (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = + ## Get a single page of history without filtering. + ## Adhere to the pagingInfo parameters + + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + return getPage(storeQueue, predicate, pagingInfo) + + +method getMessagesByHistoryQuery*( + store: StoreQueueRef, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(PagingIndex), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] = + + proc matchesQuery(indMsg: IndexedWakuMessage): bool = + trace "Matching indexed message against predicate", msg=indMsg + + if pubsubTopic.isSome(): + # filter by pubsub topic + if indMsg.pubsubTopic != pubsubTopic.get(): + trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic + return false + + if startTime.isSome() and endTime.isSome(): + # temporal filtering: select only messages whose sender generated timestamps fall + # between the queried start time and end time + if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get(): + trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp + return false + + if contentTopic.isSome(): + # filter by content topic + if indMsg.msg.contentTopic notin contentTopic.get(): + trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic + return false + + return true + + + let queryPagingInfo = PagingInfo( + pageSize: maxPageSize, + cursor: cursor.get(PagingIndex()), + direction: if ascendingOrder: PagingDirection.FORWARD + else: PagingDirection.BACKWARD + ) + let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo) + + if error == HistoryResponseError.INVALID_CURSOR: + return err("invalid cursor") + + if messages.len == 0: + return ok((messages, none(PagingInfo))) + + ok((messages, some(pagingInfo))) + + +method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] = + ok(int64(s.len())) + +method getOldestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] = + s.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + +method getNewestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] = + s.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + + +method deleteMessagesOlderThanTimestamp*(s: StoreQueueRef, ts: Timestamp): MessageStoreResult[void] = + # TODO: Implement this message_store method + err("interface method not implemented") + +method deleteOldestMessagesNotWithinLimit*(s: StoreQueueRef, limit: int): MessageStoreResult[void] = + # TODO: Implement this message_store method + err("interface method not implemented") \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/queries.nim b/waku/v2/node/storage/message/sqlite_store/queries.nim index f313bfbd8..28ab95154 100644 --- a/waku/v2/node/storage/message/sqlite_store/queries.nim +++ b/waku/v2/node/storage/message/sqlite_store/queries.nim @@ -227,7 +227,7 @@ proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[st contentTopicWhere &= ")" some(contentTopicWhere) -proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] = +proc cursorWhereClause(cursor: Option[PagingIndex], ascending=true): Option[string] = if cursor.isNone(): return none(string) @@ -292,7 +292,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab proc execSelectMessagesWithLimitStmt(s: SqliteStmt, contentTopic: Option[seq[ContentTopic]], pubsubTopic: Option[string], - cursor: Option[Index], + cursor: Option[PagingIndex], startTime: Option[Timestamp], endTime: Option[Timestamp], onRowCallback: DataProc): DatabaseResult[void] = @@ -353,7 +353,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt, proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, contentTopic: Option[seq[ContentTopic]], pubsubTopic: Option[string], - cursor: Option[Index], + cursor: Option[PagingIndex], startTime: Option[Timestamp], endTime: Option[Timestamp], limit: uint64, diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 5a5d83929..032cf80fc 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -92,7 +92,7 @@ method getMessagesByHistoryQuery*( s: SqliteStore, contentTopic = none(seq[ContentTopic]), pubsubTopic = none(string), - cursor = none(Index), + cursor = none(PagingIndex), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = DefaultPageSize, @@ -117,7 +117,7 @@ method getMessagesByHistoryQuery*( # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message # Compute last message index let (message, storedAt, pubsubTopic) = rows[^1] - let lastIndex = Index.compute(message, storedAt, pubsubTopic) + let lastIndex = PagingIndex.compute(message, storedAt, pubsubTopic) let pagingInfo = PagingInfo( pageSize: uint64(messages.len), diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 043086c45..8697dae73 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -1,448 +1,9 @@ {.push raises: [Defect].} import - std/[options, algorithm, times], - stew/[results, sorted_set], - chronicles -import - ../../../protocol/waku_message, - ../../../protocol/waku_store/rpc, - ../../../utils/pagination, - ../../../utils/time, - ./message_store + ./queue_store/index, + ./queue_store/queue_store -export pagination - -logScope: - topics = "message_store.storequeue" - - -const StoreQueueDefaultMaxCapacity* = 25_000 - - -type - IndexedWakuMessage* = object - # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message - ## This type is used to encapsulate a WakuMessage and its Index - msg*: WakuMessage - index*: Index - pubsubTopic*: string - - QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} - - -type - StoreQueueRef* = ref object of MessageStore - ## Bounded repository for indexed messages - ## - ## The store queue will keep messages up to its - ## configured capacity. As soon as this capacity - ## is reached and a new message is added, the oldest - ## item will be removed to make space for the new one. - ## This implies both a `delete` and `add` operation - ## for new items. - ## - ## @ TODO: a circular/ring buffer may be a more efficient implementation - ## @ TODO: we don't need to store the Index twice (as key and in the value) - items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages - capacity: int # Maximum amount of messages to keep - - -### Helpers - -proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], - startCursor: Index): - SortedSetResult[Index, IndexedWakuMessage] = - ## Fast forward `w` to start cursor - ## TODO: can probably improve performance here with a binary/tree search - - var nextItem = w.first - - trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem - - ## Fast forward until we reach the startCursor - while nextItem.isOk: - if nextItem.value.key == startCursor: - # Exit ffd loop when we find the start cursor - break - - # Not yet at cursor. Continue advancing - nextItem = w.next - trace "Continuing ffd to start cursor", nextItem=nextItem - - return nextItem - -proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], - startCursor: Index): - SortedSetResult[Index, IndexedWakuMessage] = - ## Rewind `w` to start cursor - ## TODO: can probably improve performance here with a binary/tree search - - var prevItem = w.last - - trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem - - ## Rewind until we reach the startCursor - - while prevItem.isOk: - if prevItem.value.key == startCursor: - # Exit rwd loop when we find the start cursor - break - - # Not yet at cursor. Continue rewinding. - prevItem = w.prev - trace "Continuing rewind to start cursor", prevItem=prevItem - - return prevItem - -proc fwdPage(storeQueue: StoreQueueRef, - pred: QueryFilterMatcher, - maxPageSize: uint64, - startCursor: Option[Index]): - (seq[WakuMessage], PagingInfo, HistoryResponseError) = - ## Populate a single page in forward direction - ## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined. - ## Page size must not exceed `maxPageSize` - ## Each entry must match the `pred` - - trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor - - var - outSeq: seq[WakuMessage] - outPagingInfo: PagingInfo - outError: HistoryResponseError - - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - currentEntry: SortedSetResult[Index, IndexedWakuMessage] - lastValidCursor: Index - - # Find first entry - if startCursor.isSome(): - lastValidCursor = startCursor.get() - - let cursorEntry = w.ffdToCursor(startCursor.get()) - if cursorEntry.isErr: - # Quick exit here if start cursor not found - trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor - outSeq = @[] - outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD) - outError = HistoryResponseError.INVALID_CURSOR - w.destroy - return (outSeq, outPagingInfo, outError) - - # Advance walker once more - currentEntry = w.next - else: - # Start from the beginning of the queue - lastValidCursor = Index() # No valid (only empty) last cursor - currentEntry = w.first - - trace "Starting fwd page query", currentEntry=currentEntry - - ## This loop walks forward over the queue: - ## 1. from the given cursor (or first entry, if not provided) - ## 2. adds entries matching the predicate function to output page - ## 3. until either the end of the queue or maxPageSize is reached - var numberOfItems = 0.uint - while currentEntry.isOk and numberOfItems < maxPageSize: - - trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems - - if pred(currentEntry.value.data): - trace "Current item matches predicate. Adding to output." - lastValidCursor = currentEntry.value.key - outSeq.add(currentEntry.value.data.msg) - numberOfItems += 1 - currentEntry = w.next - w.destroy - - outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, - cursor: lastValidCursor, - direction: PagingDirection.FORWARD) - - outError = HistoryResponseError.NONE - - trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo - - return (outSeq, outPagingInfo, outError) - -proc bwdPage(storeQueue: StoreQueueRef, - pred: QueryFilterMatcher, - maxPageSize: uint64, - startCursor: Option[Index]): - (seq[WakuMessage], PagingInfo, HistoryResponseError) = - ## Populate a single page in backward direction - ## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined. - ## Page size must not exceed `maxPageSize` - ## Each entry must match the `pred` - - trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor - - var - outSeq: seq[WakuMessage] - outPagingInfo: PagingInfo - outError: HistoryResponseError - - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - currentEntry: SortedSetResult[Index, IndexedWakuMessage] - lastValidCursor: Index - - # Find starting entry - if startCursor.isSome(): - lastValidCursor = startCursor.get() - - let cursorEntry = w.rwdToCursor(startCursor.get()) - if cursorEntry.isErr(): - # Quick exit here if start cursor not found - trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor - outSeq = @[] - outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD) - outError = HistoryResponseError.INVALID_CURSOR - w.destroy - return (outSeq, outPagingInfo, outError) - - # Step walker one more step back - currentEntry = w.prev - else: - # Start from the back of the queue - lastValidCursor = Index() # No valid (only empty) last cursor - currentEntry = w.last - - trace "Starting bwd page query", currentEntry=currentEntry - - ## This loop walks backward over the queue: - ## 1. from the given cursor (or last entry, if not provided) - ## 2. adds entries matching the predicate function to output page - ## 3. until either the beginning of the queue or maxPageSize is reached - var numberOfItems = 0.uint - while currentEntry.isOk() and numberOfItems < maxPageSize: - trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems - - if pred(currentEntry.value.data): - trace "Current item matches predicate. Adding to output." - lastValidCursor = currentEntry.value.key - outSeq.add(currentEntry.value.data.msg) - numberOfItems += 1 - currentEntry = w.prev - w.destroy - - outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, - cursor: lastValidCursor, - direction: PagingDirection.BACKWARD) - outError = HistoryResponseError.NONE - - trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo - - return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order - outPagingInfo, - outError) - - -#### API - -proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T = - var items = SortedSet[Index, IndexedWakuMessage].init() - return StoreQueueRef(items: items, capacity: capacity) - - -proc contains*(store: StoreQueueRef, index: Index): bool = - ## Return `true` if the store queue already contains the `index`, `false` otherwise. - store.items.eq(index).isOk() - -proc len*(store: StoreQueueRef): int {.noSideEffect.} = - store.items.len - -proc `$`*(store: StoreQueueRef): string = - $store.items - - -## --- SortedSet accessors --- - -iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = - ## Forward iterator over the entire store queue - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.first() - - while res.isOk(): - yield (res.value.key, res.value.data) - res = w.next() - - w.destroy() - -iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = - ## Backwards iterator over the entire store queue - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.last() - - while res.isOk(): - yield (res.value.key, res.value.data) - res = w.prev() - - w.destroy() - -proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items) - res = w.first() - w.destroy() - - if res.isErr(): - return err("Not found") - - return ok(res.value.data) - -proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = - var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.last() - w.destroy() - - if res.isErr(): - return err("Not found") - - return ok(res.value.data) - - -## --- Queue API --- - -proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] = - ## Add a message to the queue - ## - ## If we're at capacity, we will be removing, the oldest (first) item - if store.contains(msg.index): - trace "could not add item to store queue. Index already exists", index=msg.index - return err("duplicate") - - # TODO: the below delete block can be removed if we convert to circular buffer - if store.items.len >= store.capacity: - var - w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items) - firstItem = w.first - - if cmp(msg.index, firstItem.value.key) < 0: - # When at capacity, we won't add if message index is smaller (older) than our oldest item - w.destroy # Clean up walker - return err("too_old") - - discard store.items.delete(firstItem.value.key) - w.destroy # better to destroy walker after a delete operation - - store.items.insert(msg.index).value.data = msg - - return ok() - - -method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] = - let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) - let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) - store.add(message) - -method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = - procCall MessageStore(store).put(pubsubTopic, message) - - -proc getPage*(storeQueue: StoreQueueRef, - pred: QueryFilterMatcher, - pagingInfo: PagingInfo): - (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = - ## Get a single page of history matching the predicate and - ## adhering to the pagingInfo parameters - - trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo - - let - cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! - else: some(pagingInfo.cursor) - maxPageSize = pagingInfo.pageSize - - case pagingInfo.direction - of PagingDirection.FORWARD: - return storeQueue.fwdPage(pred, maxPageSize, cursorOpt) - of PagingDirection.BACKWARD: - return storeQueue.bwdPage(pred, maxPageSize, cursorOpt) - -proc getPage*(storeQueue: StoreQueueRef, - pagingInfo: PagingInfo): - (seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} = - ## Get a single page of history without filtering. - ## Adhere to the pagingInfo parameters - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - return getPage(storeQueue, predicate, pagingInfo) - - -method getMessagesByHistoryQuery*( - store: StoreQueueRef, - contentTopic = none(seq[ContentTopic]), - pubsubTopic = none(string), - cursor = none(Index), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = DefaultPageSize, - ascendingOrder = true -): MessageStoreResult[MessageStorePage] = - - proc matchesQuery(indMsg: IndexedWakuMessage): bool = - trace "Matching indexed message against predicate", msg=indMsg - - if pubsubTopic.isSome(): - # filter by pubsub topic - if indMsg.pubsubTopic != pubsubTopic.get(): - trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic - return false - - if startTime.isSome() and endTime.isSome(): - # temporal filtering: select only messages whose sender generated timestamps fall - # between the queried start time and end time - if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get(): - trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp - return false - - if contentTopic.isSome(): - # filter by content topic - if indMsg.msg.contentTopic notin contentTopic.get(): - trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic - return false - - return true - - - let queryPagingInfo = PagingInfo( - pageSize: maxPageSize, - cursor: cursor.get(Index()), - direction: if ascendingOrder: PagingDirection.FORWARD - else: PagingDirection.BACKWARD - ) - let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo) - - if error == HistoryResponseError.INVALID_CURSOR: - return err("invalid cursor") - - if messages.len == 0: - return ok((messages, none(PagingInfo))) - - ok((messages, some(pagingInfo))) - - -method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] = - ok(int64(s.len())) - -method getOldestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] = - s.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) - -method getNewestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] = - s.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) - - -method deleteMessagesOlderThanTimestamp*(s: StoreQueueRef, ts: Timestamp): MessageStoreResult[void] = - # TODO: Implement this message_store method - err("interface method not implemented") - -method deleteOldestMessagesNotWithinLimit*(s: StoreQueueRef, limit: int): MessageStoreResult[void] = - # TODO: Implement this message_store method - err("interface method not implemented") \ No newline at end of file +export + queue_store, + index diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 3ca2c47eb..2b3a06760 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -103,8 +103,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} else: none(seq[ContentTopic]) qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic) else: none(string) - qCursor = if query.pagingInfo.cursor != Index(): some(query.pagingInfo.cursor) - else: none(Index) + qCursor = if query.pagingInfo.cursor != PagingIndex(): some(query.pagingInfo.cursor) + else: none(PagingIndex) qStartTime = if query.startTime != Timestamp(0): some(query.startTime) else: none(Timestamp) qEndTime = if query.endTime != Timestamp(0): some(query.endTime) diff --git a/waku/v2/protocol/waku_store/rpc_codec.nim b/waku/v2/protocol/waku_store/rpc_codec.nim index 857f48349..ad6b2961c 100644 --- a/waku/v2/protocol/waku_store/rpc_codec.nim +++ b/waku/v2/protocol/waku_store/rpc_codec.nim @@ -15,7 +15,7 @@ import const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead -proc encode*(index: Index): ProtoBuffer = +proc encode*(index: PagingIndex): ProtoBuffer = ## Encode an Index object into a ProtoBuffer ## returns the resultant ProtoBuffer @@ -28,9 +28,9 @@ proc encode*(index: Index): ProtoBuffer = return output -proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] = +proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = ## creates and returns an Index object out of buffer - var index = Index() + var index = PagingIndex() let pb = initProtoBuffer(buffer) var data: seq[byte] @@ -80,7 +80,7 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = var cursorBuffer: seq[byte] discard ?pb.getField(2, cursorBuffer) - pagingInfo.cursor = ?Index.init(cursorBuffer) + pagingInfo.cursor = ?PagingIndex.init(cursorBuffer) var direction: uint32 discard ?pb.getField(3, direction) diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index 3d8a3f29b..16496fc9a 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -8,15 +8,15 @@ import ../protocol/waku_message, ./time -type MessageDigest* = MDigest[256] - const MaxPageSize*: uint64 = 100 DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page -type Index* = object +type MessageDigest* = MDigest[256] + +type PagingIndex* = object ## This type contains the description of an Index used in the pagination of WakuMessages pubsubTopic*: string senderTime*: Timestamp # the time at which the message is generated @@ -34,60 +34,25 @@ proc computeDigest*(msg: WakuMessage): MessageDigest = # Computes the hash return ctx.finish() -proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T = +proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T = ## Takes a WakuMessage with received timestamp and returns its Index. let digest = computeDigest(msg) senderTime = msg.timestamp - Index( + PagingIndex( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, digest: digest ) -proc `==`*(x, y: Index): bool = +proc `==`*(x, y: PagingIndex): bool = ## receiverTime plays no role in index equality (x.senderTime == y.senderTime) and (x.digest == y.digest) and (x.pubsubTopic == y.pubsubTopic) -proc cmp*(x, y: Index): int = - ## compares x and y - ## returns 0 if they are equal - ## returns -1 if x < y - ## returns 1 if x > y - ## - ## Default sorting order priority is: - ## 1. senderTimestamp - ## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal) - ## 3. message digest - ## 4. pubsubTopic - - if x == y: - # Quick exit ensures receiver time does not affect index equality - return 0 - - # Timestamp has a higher priority for comparison - let - # Use receiverTime where senderTime is unset - xTimestamp = if x.senderTime == 0: x.receiverTime - else: x.senderTime - yTimestamp = if y.senderTime == 0: y.receiverTime - else: y.senderTime - - let timecmp = cmp(xTimestamp, yTimestamp) - if timecmp != 0: - return timecmp - - # Continue only when timestamps are equal - let digestcmp = cmp(x.digest.data, y.digest.data) - if digestcmp != 0: - return digestcmp - - return cmp(x.pubsubTopic, y.pubsubTopic) - type PagingDirection* {.pure.} = enum @@ -98,5 +63,5 @@ type PagingInfo* = object ## This type holds the information needed for the pagination pageSize*: uint64 - cursor*: Index + cursor*: PagingIndex direction*: PagingDirection