diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index ce82c4590..958dfc967 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -266,7 +266,7 @@ procSuite "Waku v2 JSON-RPC API": let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) check: response.messages.len() == 8 - response.pagingOptions.isSome() + response.pagingOptions.isNone() await server.stop() await server.closeWait() diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index 9cebc0b07..201ed22f1 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, strutils], + std/[sequtils, strutils, algorithm], stew/results, testutils/unittests import @@ -167,105 +167,7 @@ procSuite "Sorted store queue": check: lastRes.isErr() lastRes.error() == "Not found" - - test "forward pagination": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD) - - ## When - let pageRes1 = store.getPage(predicate, pagingInfo) - require pageRes1.isOk() - let pageRes2 = store.getPage(predicate, pageRes1.value[1]) - require pageRes2.isOk() - let pageRes3 = store.getPage(predicate, pageRes2.value[1]) - - ## Then - # First page - check pageRes1.isOk() - - var (res, pInfo) = pageRes1.get() - check: - pInfo.pageSize == 3 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(3) - res.mapIt(it.timestamp.int) == @[1,2,3] - - # Second page - check pageRes2.isOk() - - (res, pInfo) = pageRes2.get() - check: - pInfo.pageSize == 2 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(5) - res.mapIt(it.timestamp.int) == @[4,5] - - # Empty last page - check pageRes3.isOk() - - (res, pInfo) = pageRes3.get() - check: - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(5) - res.len == 0 - - test "backward pagination": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD) - - ## When - let pageRes1 = store.getPage(predicate, pagingInfo) - require pageRes1.isOk() - let pageRes2 = store.getPage(predicate, pageRes1.value[1]) - require pageRes2.isOk() - let pageRes3 = store.getPage(predicate, pageRes2.value[1]) - - ## Then - # First page - check pageRes1.isOk() - - var (res, pInfo) = pageRes1.get() - check: - pInfo.pageSize == 3 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(3) - res.mapIt(it.timestamp.int) == @[3,4,5] - - # Second page - check pageRes2.isOk() - - (res, pInfo) = pageRes2.get() - check: - pInfo.pageSize == 2 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(1) - res.mapIt(it.timestamp.int) == @[1,2] - - # Empty last page - check pageRes3.isOk() - - (res, pInfo) = pageRes3.get() - check: - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(1) - res.len == 0 - test "Store queue pagination works with predicate - fwd direction": ## Given let @@ -277,31 +179,16 @@ procSuite "Sorted store queue": ## When let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)) - require pageRes1.isOk() - let pageRes2 = store.getPage(onlyEvenTimes, pageRes1.value[1]) ## Then # First page check pageRes1.isOk() - var (res, pInfo) = pageRes1.get() + var res = pageRes1.get().mapIt(it[1]) check: - pInfo.pageSize == 2 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(4) res.mapIt(it.timestamp.int) == @[2,4] - # Empty next page - check pageRes2.isOk() - - (res, pInfo) = pageRes2.get() - check: - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(4) - res.len == 0 - test "Store queue pagination works with predicate - bwd direction": ## Given let @@ -313,42 +200,15 @@ procSuite "Sorted store queue": ## When let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)) - require pageRes1.isOk() - let pageRes2 = store.getPage(onlyOddTimes, pageRes1.value[1]) - require pageRes2.isOk() - let pageRes3 = store.getPage(onlyOddTimes, pageRes2.value[1]) ## Then # First page check pageRes1.isOk() - var (res, pInfo) = pageRes1.get() + var res = pageRes1.get().mapIt(it[1]) check: - pInfo.pageSize == 2 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(3) - res.mapIt(it.timestamp.int) == @[3,5] + res.mapIt(it.timestamp.int) == @[3,5].reversed - # Next page - check pageRes2.isOk() - - (res, pInfo) = pageRes2.get() - check: - pInfo.pageSize == 1 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(1) - res.mapIt(it.timestamp.int) == @[1] - - # Empty last page - check pageRes3.isOk() - - (res, pInfo) = pageRes3.get() - check: - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(1) - res.len == 0 - test "handle pagination on empty store - fwd direction": ## Given let capacity = 5 @@ -366,11 +226,8 @@ procSuite "Sorted store queue": # Empty response check pageRes.isOk() - let (res, pInfo) = pageRes.get() + var res = pageRes.get() check: - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == Timestamp(0) res.len == 0 test "handle pagination on empty store - bwd direction": @@ -390,12 +247,8 @@ procSuite "Sorted store queue": # Empty response check pageRes.isOk() - let (res, pInfo) = pageRes.get() + let res = pageRes.get() check: - # Empty response - pInfo.pageSize == 0 - pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == Timestamp(0) res.len == 0 test "handle invalid cursor - fwd direction": diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index 54d59d24f..3343f8973 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, times], + std/[options, sequtils, times, algorithm], testutils/unittests, nimcrypto/sha2, libp2p/protobuf/minprotobuf @@ -53,69 +53,49 @@ suite "Queue store - pagination": var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) # test for a normal pagination - var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + var data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 2 data == msgList[4..5] - newPagingInfo.cursor == indexList[5].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == pagingInfo.pageSize # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 2 data == msgList[0..1] - newPagingInfo.cursor == indexList[1].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 2 # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 10 data == msgList[0..9] - newPagingInfo.cursor == indexList[9].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 10 # test for an empty msgList pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet() + data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.pageSize == 0 - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.cursor == pagingInfo.cursor # test for a page size larger than the remaining messages pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 6 data == msgList[4..9] - newPagingInfo.cursor == indexList[9].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 6 # test for a page size larger than the maximum allowed page size pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: uint64(data.len) <= MaxPageSize - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize <= MaxPageSize # test for a cursor pointing to the end of the message list pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.cursor == indexList[9].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 # test for an invalid cursor let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic) @@ -127,22 +107,16 @@ suite "Queue store - pagination": # test initial paging query over a message list with one message var singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() + data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 1 - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 1 # test pagination over a message list with one message singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD) - (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() + data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 test "Backward pagination test": let @@ -153,68 +127,47 @@ suite "Queue store - pagination": var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) # test for a normal pagination - var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + var data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: - data == msgList[1..2] - newPagingInfo.cursor == indexList[1].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == pagingInfo.pageSize + data == msgList[1..2].reversed # test for an empty msgList pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet() + data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.pageSize == 0 - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.cursor == pagingInfo.cursor # test for an initial pagination request with an empty cursor pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 2 - data == msgList[8..9] - newPagingInfo.cursor == indexList[8].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 2 + data == msgList[8..9].reversed # test for an initial pagination request with an empty cursor to fetch the entire history pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 10 - data == msgList[0..9] - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 10 + data == msgList[0..9].reversed # test for a page size larger than the remaining messages pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: - data == msgList[0..2] - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 3 + data == msgList[0..2].reversed # test for a page size larger than the Maximum allowed page size pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: uint64(data.len) <= MaxPageSize - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize <= MaxPageSize # test for a cursor pointing to the begining of the message list pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(store, pagingInfo).tryGet() - + data = getPage(store, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 # test for an invalid cursor let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic) @@ -226,19 +179,13 @@ suite "Queue store - pagination": # test initial paging query over a message list with one message var singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() + data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 1 - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 1 # test paging query over a message list with one message singleItemMsgList = getTestStoreQueue(1) pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD) - (data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet() + data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1]) check: data.len == 0 - newPagingInfo.cursor == indexList[0].toPagingIndex() - newPagingInfo.direction == pagingInfo.direction - newPagingInfo.pageSize == 0 diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index 33f58f40f..0f1e12351 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -4,8 +4,7 @@ import std/[unittest, options, tables, sets, times, strutils, sequtils, os], stew/byteutils, chronos, - chronicles, - sqlite3_abi + chronicles import ../../waku/v2/node/storage/message/sqlite_store, ../../waku/v2/node/storage/message/message_retention_policy, @@ -95,7 +94,7 @@ suite "SQLite message store - insert messages": check: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (_, msg, pubsubTopic) = item + let (pubsubTopic, msg, digest, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic @@ -133,7 +132,7 @@ suite "SQLite message store - insert messages": check: storedMsg.len == storeCapacity storedMsg.all do (item: auto) -> bool: - let (_, msg, pubsubTopic) = item + let (pubsubTopic, msg, digest, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic @@ -186,7 +185,7 @@ suite "Message Store": # flags for receiver timestamp var rt1Flag, rt2Flag, rt3Flag: bool = false - for (receiverTimestamp, msg, pubsubTopic) in result: + for (pubsubTopic, msg, digest, receiverTimestamp) in result: check: pubsubTopic == DefaultPubsubTopic diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 3a7cede8c..7f20d6610 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, tables, sets, times, strutils, sequtils], + std/[options, tables, sets, times, strutils, sequtils, algorithm], stew/byteutils, unittest2, chronos, @@ -78,15 +78,12 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == messages[2..3] - - check: - pagingInfo.isSome() ## Teardown store.close() @@ -126,15 +123,12 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic - filteredMessages == messages[6..7] - - check: - pagingInfo.isSome() + filteredMessages == messages[6..7].reversed ## Teardown store.close() @@ -176,16 +170,13 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic in @[contentTopic1, contentTopic2] filteredMessages == messages[2..3] - check: - pagingInfo.isSome() - ## Teardown store.close() @@ -230,16 +221,13 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == messages2[0..1] - check: - pagingInfo.isSome() - ## Teardown store.close() @@ -281,16 +269,13 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == messages[5..6] - check: - pagingInfo.isSome() - ## Teardown store.close() @@ -332,16 +317,13 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic - filteredMessages == messages[4..5] + filteredMessages == messages[4..5].reversed - check: - pagingInfo.isSome() - ## Teardown store.close() @@ -387,15 +369,12 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == messages2[0..1] - - check: - pagingInfo.isSome() ## Teardown store.close() @@ -430,10 +409,9 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 0 - pagingInfo.isNone() ## Teardown store.close() @@ -460,11 +438,9 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 50 - pagingInfo.isSome() - pagingInfo.get().pageSize == 50 ## Teardown store.close() @@ -491,11 +467,9 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 40 - pagingInfo.isSome() - pagingInfo.get().pageSize == 40 ## Teardown store.close() @@ -520,8 +494,7 @@ suite "message store - history query": ] for msg in messages: - let digest = computeDigest(msg) - require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -536,16 +509,13 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 2 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == messages[1..2] - check: - pagingInfo.isSome() - ## Teardown store.close() @@ -567,8 +537,7 @@ suite "message store - history query": ] for msg in messages: - let digest = computeDigest(msg) - require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -582,10 +551,9 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 0 - pagingInfo.isNone() ## Teardown store.close() @@ -609,8 +577,7 @@ suite "message store - history query": ] for msg in messages: - let digest = computeDigest(msg) - require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -622,15 +589,12 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 3 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic - filteredMessages == messages[2..4] - - check: - pagingInfo.isSome() + filteredMessages == messages[2..4].reversed ## Teardown store.close() @@ -655,8 +619,7 @@ suite "message store - history query": ] for msg in messages: - let digest = computeDigest(msg) - require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) @@ -672,15 +635,12 @@ suite "message store - history query": check: res.isOk() - let (filteredMessages, pagingInfo) = res.tryGet() + let filteredMessages = res.tryGet().mapIt(it[1]) check: filteredMessages.len == 1 filteredMessages.all do (msg: WakuMessage) -> bool: msg.contentTopic == contentTopic filteredMessages == @[messages[^1]] - check: - pagingInfo.isSome() - ## Teardown store.close() diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index bb6e465e6..0ae864c88 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -444,9 +444,7 @@ suite "Waku Store - history query": ## No pagination specified. Response will be auto-paginated with ## up to MaxPageSize messages per page. response.messages.len() == 8 - response.pagingInfo.pageSize == 8 - response.pagingInfo.direction == PagingDirection.BACKWARD - response.pagingInfo.cursor != PagingIndex() + response.pagingInfo == PagingInfo() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim index 33c0c94d1..79553483f 100644 --- a/waku/v2/node/storage/message/dual_message_store.nim +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -33,9 +33,8 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message if res.isErr(): warn "failed to load messages from the persistent store", err = res.error else: - for (receiverTime, msg, pubsubTopic) in res.value: - let digest = computeDigest(msg) - discard inmemory.put(pubsubTopic, msg, digest, receiverTime) + for (pubsubTopic, msg, _, storeTimestamp) in res.value: + discard inmemory.put(pubsubTopic, msg, computeDigest(msg), storeTimestamp) info "successfully loaded messages from the persistent store" @@ -65,7 +64,7 @@ method getMessagesByHistoryQuery*( endTime = none(Timestamp), maxPageSize = MaxPageSize, ascendingOrder = true -): MessageStoreResult[MessageStorePage] = +): MessageStoreResult[seq[MessageStoreRow]] = s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder) diff --git a/waku/v2/node/storage/message/queue_store/queue_store.nim b/waku/v2/node/storage/message/queue_store/queue_store.nim index 31838a161..47cc6b7c8 100644 --- a/waku/v2/node/storage/message/queue_store/queue_store.nim +++ b/waku/v2/node/storage/message/queue_store/queue_store.nim @@ -1,7 +1,7 @@ {.push raises: [Defect].} import - std/[options, algorithm, times], + std/[options, times], stew/[results, sorted_set], chronicles import @@ -30,7 +30,7 @@ type QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} - StoreQueueGetPageResult = Result[(seq[WakuMessage], PagingInfo), HistoryResponseError] + StoreQueueGetPageResult = Result[seq[MessageStoreRow], HistoryResponseError] type StoreQueueRef* = ref object of MessageStore @@ -108,21 +108,15 @@ proc getPage(storeQueue: StoreQueueRef, trace "Retrieving page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor, forward=forward - var - outSeq: seq[WakuMessage] - outPagingInfo: PagingInfo + var outSeq: seq[MessageStoreRow] var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) defer: w.destroy() - - var - currentEntry: SortedSetResult[Index, IndexedWakuMessage] - lastValidCursor: Index + + var currentEntry: SortedSetResult[Index, IndexedWakuMessage] # Find starting entry if startCursor.isSome(): - lastValidCursor = startCursor.get() - let cursorEntry = if forward: w.ffdToCursor(startCursor.get()) else: w.rwdToCursor(startCursor.get()) if cursorEntry.isErr(): @@ -135,7 +129,6 @@ proc getPage(storeQueue: StoreQueueRef, else: w.prev() else: # Start from the beginning of the queue - lastValidCursor = Index() # No valid (only empty) last cursor currentEntry = if forward: w.first() else: w.last() @@ -150,25 +143,20 @@ proc getPage(storeQueue: StoreQueueRef, trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems if pred(currentEntry.value.data): - lastValidCursor = currentEntry.value.key - outSeq.add(currentEntry.value.data.msg) + let + key = currentEntry.value.key + data = currentEntry.value.data + numberOfItems += 1 + outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + currentEntry = if forward: w.next() else: w.prev() trace "Successfully retrieved page", len=outSeq.len - - outPagingInfo = PagingInfo(pageSize: outSeq.len.uint, - cursor: lastValidCursor.toPagingIndex(), - direction: if forward: PagingDirection.FORWARD - else: PagingDirection.BACKWARD) - - # Even if paging backwards, each page should be in forward order - if not forward: - outSeq.reverse() - return ok((outSeq, outPagingInfo)) + return ok(outSeq) #### API @@ -310,7 +298,7 @@ method getMessagesByHistoryQuery*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true -): MessageStoreResult[MessageStorePage] = +): MessageStoreResult[seq[MessageStoreRow]] = proc matchesQuery(indMsg: IndexedWakuMessage): bool = trace "Matching indexed message against predicate", msg=indMsg @@ -347,11 +335,7 @@ method getMessagesByHistoryQuery*( if getPageRes.isErr(): return err("invalid cursor") - let (messages, pagingInfo) = getPageRes.value - if messages.len == 0: - return ok((messages, none(PagingInfo))) - - ok((messages, some(pagingInfo))) + ok(getPageRes.value) method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] = diff --git a/waku/v2/node/storage/message/sqlite_store/queries.nim b/waku/v2/node/storage/message/sqlite_store/queries.nim index 31aa444dd..832f9caf2 100644 --- a/waku/v2/node/storage/message/sqlite_store/queries.nim +++ b/waku/v2/node/storage/message/sqlite_store/queries.nim @@ -50,6 +50,13 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str pubsubTopic +proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] = + let + digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol)) + digestLength = sqlite3_column_bytes(s, 3) + digest = @(toOpenArray(digestPointer, 0, digestLength-1)) + + digest ### SQLite queries @@ -188,20 +195,21 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & " FROM " & table & " ORDER BY storedAt ASC" -proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] = +proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] = ## Retrieve all messages from the store. - var rows: seq[(Timestamp, WakuMessage, string)] + var rows: seq[(string, WakuMessage, seq[byte], Timestamp)] proc queryRowCallback(s: ptr sqlite3_stmt) = let - storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + digest = queryRowDigestCallback(s, digestCol=6) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - rows.add((storedAt, wakuMessage, pubsubTopic)) + rows.add((pubsubTopic, wakuMessage, digest, storedAt)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) @@ -274,7 +282,7 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" query &= " FROM " & table if where.isSome(): @@ -353,17 +361,18 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, startTime: Option[Timestamp], endTime: Option[Timestamp], limit: uint64, - ascending: bool): DatabaseResult[seq[(WakuMessage, Timestamp, string)]] = + ascending: bool): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] = - var messages: seq[(WakuMessage, Timestamp, string)] = @[] + var messages: seq[(string, WakuMessage, seq[byte], Timestamp)] = @[] proc queryRowCallback(s: ptr sqlite3_stmt) = let - storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + digest = queryRowDigestCallback(s, digestCol=6) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - messages.add((message, storedAt, pubsubTopic)) + messages.add((pubsubTopic, message, digest, storedAt)) let query = block: let diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index c14dcf8dc..2eb238aec 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -74,7 +74,7 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M message.timestamp # senderTimestamp )) if res.isErr(): - return err("message insert failed: " & res.error()) + return err("message insert failed: " & res.error) ok() @@ -97,10 +97,10 @@ method getMessagesByHistoryQuery*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true -): MessageStoreResult[MessageStorePage] = +): MessageStoreResult[seq[MessageStoreRow]] = let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic)) - let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( + return s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, pubsubTopic, cursor, @@ -110,29 +110,6 @@ method getMessagesByHistoryQuery*( ascending=ascendingOrder ) - if rows.len <= 0: - return ok((@[], none(PagingInfo))) - - var messages = rows.mapIt(it[0]) - - # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message - # Compute last message index - let (message, storedAt, pubsubTopic) = rows[^1] - let lastIndex = PagingIndex.compute(message, storedAt, pubsubTopic) - - let pagingInfo = PagingInfo( - pageSize: uint64(messages.len), - cursor: lastIndex, - direction: if ascendingOrder: PagingDirection.FORWARD - else: PagingDirection.BACKWARD - ) - - # The retrieved messages list should always be in chronological order - if not ascendingOrder: - messages.reverse() - - ok((messages, some(pagingInfo))) - method getMessagesCount*(s: SqliteStore): MessageStoreResult[int64] = s.db.getMessageCount() diff --git a/waku/v2/protocol/waku_store/message_store.nim b/waku/v2/protocol/waku_store/message_store.nim index 6ac956467..30e8440e4 100644 --- a/waku/v2/protocol/waku_store/message_store.nim +++ b/waku/v2/protocol/waku_store/message_store.nim @@ -15,9 +15,7 @@ import type MessageStoreResult*[T] = Result[T, string] - MessageStorePage* = (seq[WakuMessage], Option[PagingInfo]) - - MessageStoreRow* = (Timestamp, WakuMessage, string) + MessageStoreRow* = (string, WakuMessage, seq[byte], Timestamp) MessageStore* = ref object of RootObj @@ -44,7 +42,7 @@ method getMessagesByHistoryQuery*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true -): MessageStoreResult[MessageStorePage] {.base.} = discard +): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard # Store manipulation diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 7038fd263..e666b0a4d 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -4,7 +4,7 @@ {.push raises: [Defect].} import - std/[tables, times, sequtils, options], + std/[tables, times, sequtils, options, algorithm], stew/results, chronicles, chronos, @@ -122,7 +122,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} cursor = qCursor, startTime = qStartTime, endTime = qEndTime, - maxPageSize = qMaxPageSize, + maxPageSize = qMaxPageSize + 1, ascendingOrder = qAscendingOrder ) @@ -135,11 +135,46 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} if queryRes.isErr(): return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR) - let (messages, updatedPagingInfo) = queryRes.get() + let rows = queryRes.get() + if rows.len <= 0: + return HistoryResponse(messages: @[], error: HistoryResponseError.NONE) + + var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) + else: rows[0..^2].mapIt(it[1]) + var pagingInfo = none(PagingInfo) + + # The retrieved messages list should always be in chronological order + if not qAscendingOrder: + messages.reverse() + + + if rows.len > int(qMaxPageSize): + # Build last message cursor + let (pubsubTopic, message, digest, storeTimestamp) = rows[^1] + + # TODO: Improve coherence of MessageDigest type + var messageDigest: array[32, byte] + for i in 0..