diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 433a1145c..cb65f9d9f 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -28,9 +28,9 @@ import ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/discv5/waku_discv5, - ../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations, - ../../waku/v2/node/message_store/waku_store_queue, + ../../waku/v2/node/message_store/queue_store, ../../waku/v2/node/message_store/sqlite_store, + ../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations, ../../waku/v2/node/message_store/message_retention_policy, ../../waku/v2/node/message_store/message_retention_policy_capacity, ../../waku/v2/node/message_store/message_retention_policy_time, diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 4f40cf5e9..227a63c3d 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -14,7 +14,7 @@ import libp2p/protocols/pubsub/rpc/message import ../../waku/v1/node/rpc/hexstrings, - ../../waku/v2/node/message_store/waku_store_queue, + ../../waku/v2/node/message_store/queue_store, ../../waku/v2/node/waku_node, ../../waku/v2/node/jsonrpc/[store_api, relay_api, diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index 64c43127f..b419c9dae 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -1,11 +1,11 @@ {.used.} import - std/[sequtils, algorithm], + std/[options, sequtils, algorithm], stew/results, testutils/unittests import - ../../waku/v2/node/message_store/waku_store_queue, + ../../waku/v2/node/message_store/queue_store, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, ../../waku/v2/utils/time @@ -21,8 +21,8 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = let message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i)) cursor = Index( - receiverTime: Timestamp(i), - senderTime: Timestamp(i), + receiverTime: Timestamp(i), + senderTime: Timestamp(i), digest: MessageDigest(data: data), pubsubTopic: "test-pubsub-topic" ) @@ -168,130 +168,6 @@ procSuite "Sorted store queue": lastRes.isErr() lastRes.error() == "Not found" - test "Store queue pagination works with predicate - fwd direction": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0 - - ## When - let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)) - - ## Then - # First page - check pageRes1.isOk() - - var res = pageRes1.get().mapIt(it[1]) - check: - res.mapIt(it.timestamp.int) == @[2,4] - - - test "Store queue pagination works with predicate - bwd direction": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0 - - ## When - let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)) - - ## Then - # First page - check pageRes1.isOk() - - var res = pageRes1.get().mapIt(it[1]) - check: - res.mapIt(it.timestamp.int) == @[3,5].reversed - - test "handle pagination on empty store - fwd direction": - ## Given - let capacity = 5 - let store = StoreQueueRef.new(capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD) - - ## When - # Get page from empty queue in fwd dir - let pageRes = store.getPage(predicate, pagingInfo) - - ## Then - # Empty response - check pageRes.isOk() - - var res = pageRes.get() - check: - res.len == 0 - - test "handle pagination on empty store - bwd direction": - ## Given - let capacity = 5 - let store = StoreQueueRef.new(capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD) - - ## When - # Get page from empty queue in bwd dir - let pageRes = store.getPage(predicate, pagingInfo) - - ## Then - # Empty response - check pageRes.isOk() - - let res = pageRes.get() - check: - res.len == 0 - - test "handle invalid cursor - fwd direction": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest()) - let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD) - - ## When - let pageRes = store.getPage(predicate, pagingInfo) - - ## Then - check: - pageRes.isErr() - pageRes.error == HistoryResponseError.INVALID_CURSOR - - test "handle invalid cursor - bwd direction": - ## Given - let - capacity = 5 - unsortedSet = [5,1,3,2,4] - let store = getPrepopulatedTestStore(unsortedSet, capacity) - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest()) - let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD) - - ## When - let pageRes = store.getPage(predicate, pagingInfo) - - ## Then - # Empty response with error - check: - pageRes.isErr() - pageRes.error == HistoryResponseError.INVALID_CURSOR - test "verify if store queue contains an index": ## Given let diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index af932b42b..c99875bee 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -1,12 +1,12 @@ {.used.} import - std/[sequtils, algorithm], + std/[options, sequtils, algorithm], testutils/unittests, - nimcrypto/sha2, libp2p/protobuf/minprotobuf import - ../../waku/v2/node/message_store/waku_store_queue, + ../../waku/v2/node/message_store/queue_store/queue_store {.all.}, + ../../waku/v2/node/message_store/queue_store/index, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_message, ../../waku/v2/utils/time, @@ -21,11 +21,11 @@ proc getTestStoreQueue(numMessages: int): StoreQueueRef = for i in 0.. endTime.get() or indMsg.msg.timestamp < startTime.get(): - trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp return false if contentTopic.isSome(): - # filter by content topic if indMsg.msg.contentTopic notin contentTopic.get(): - trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic return false return true + var pageRes: StoreQueueGetPageResult + try: + pageRes = store.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery) + except: + return err(getCurrentExceptionMsg()) - let queryPagingInfo = PagingInfo( - pageSize: maxPageSize, - cursor: cursor.get(PagingIndex()), - direction: if ascendingOrder: PagingDirection.FORWARD - else: PagingDirection.BACKWARD - ) - let getPageRes = store.getPage(matchesQuery, queryPagingInfo) - if getPageRes.isErr(): - return err("invalid cursor") - - ok(getPageRes.value) + if pageRes.isErr(): + case pageRes.error: + of StoreQueueErrorKind.INVALID_CURSOR: + return err("invalid cursor") + + ok(pageRes.value) method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] = diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 9d6b8fb80..e715add06 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -36,7 +36,6 @@ import ../utils/peers, ../utils/wakuenr, ./peer_manager/peer_manager, - ./message_store/waku_store_queue, ./message_store/message_retention_policy, ./message_store/message_retention_policy_capacity, ./message_store/message_retention_policy_time, diff --git a/waku/v2/protocol/waku_store/message_store.nim b/waku/v2/protocol/waku_store/message_store.nim index a0f0dd9e1..1a5e3d56c 100644 --- a/waku/v2/protocol/waku_store/message_store.nim +++ b/waku/v2/protocol/waku_store/message_store.nim @@ -26,7 +26,7 @@ type # MessageStore interface method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard -method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = +method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] {.base.} = let digest = computeDigest(message) receivedTime = if message.timestamp > 0: message.timestamp diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 5d04e1483..6ed29944d 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -19,7 +19,6 @@ import metrics import ../../node/message_store/message_retention_policy, - ../../node/message_store/waku_store_queue, ../../node/peer_manager/peer_manager, ../../utils/time, ../waku_message, @@ -220,15 +219,6 @@ proc new*(T: type WakuStore, return ws -proc init*(T: type WakuStore, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, - wakuSwap: WakuSwap = nil, - retentionPolicy=none(MessageRetentionPolicy)): T = - let store = StoreQueueRef.new() - WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) - - proc isValidMessage(msg: WakuMessage): bool = if msg.timestamp == 0: return true