diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 66dcf6ba0..40a7022c8 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -192,3 +192,367 @@ suite "Message Store": # the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store numMessages == 120 + +suite "Message Store: Retrieve Pages": + setup: + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + + t1 = getNanosecondTime(epochTime())-800 + t2 = getNanosecondTime(epochTime())-700 + t3 = getNanosecondTime(epochTime())-600 + t4 = getNanosecondTime(epochTime())-500 + t5 = getNanosecondTime(epochTime())-400 + t6 = getNanosecondTime(epochTime())-300 + t7 = getNanosecondTime(epochTime())-200 + t8 = getNanosecondTime(epochTime())-100 + t9 = getNanosecondTime(epochTime()) + + var msgs = @[ + WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1), + WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2), + WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3), + WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4), + WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5), + WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6), + WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7), + WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8), + WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9), + ] + + var indexes: seq[Index] = @[] + for msg in msgs: + var index = computeIndex(msg) + let output = store.put(index, msg, pubsubTopic) + check output.isOk + indexes.add(index) + + teardown: + store.close() + + test "get forward page": + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[1], + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 3, + cursor: indexes[4], + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 3 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[k+2] == m # offset of two because we used the second message (indexes[1]) as cursor; the cursor is excluded in the returned page + + test "get backward page": + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[4], + direction: PagingDirection.BACKWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 3, + cursor: indexes[1], + direction: PagingDirection.BACKWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 3 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[^(k+6)] == m + + + test "get forward page (default index)": + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + # we don't set an index here to test the default index + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 3, + cursor: indexes[2], + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 3 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[k] == m + + + test "get backward page (default index)": + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + # we don't set an index here to test the default index + direction: PagingDirection.BACKWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 3, + cursor: indexes[6], + direction: PagingDirection.BACKWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 3 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[^(k+1)] == m + + + test "get large forward page": + let maxPageSize = 12'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + # we don't set an index here; start at the beginning + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 9, # there are only 10 msgs in total in the DB + cursor: indexes[8], + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 9 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[k] == m + + + test "get large backward page": + let maxPageSize = 12'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + # we don't set an index here to test the default index + direction: PagingDirection.BACKWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 9, + cursor: indexes[0], + direction: PagingDirection.BACKWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 9 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[^(k+1)] == m + + + test "get filtered page, maxPageSize == number of matching messages": + proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = + # filter on timestamp + if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t6: + return false + + return true + + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[1], + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 3, + cursor: indexes[5], + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 3 + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window + + test "get filtered page, maxPageSize > number of matching messages": + proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = + # filter on timestamp + if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t5: + return false + return true + + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[1], + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 2, + cursor: indexes[8], # index is advanced by one because one message was not accepted by the filter + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 2 # only two messages are in the time window going through the filter + outPagingInfo == expectedOutPagingInfo + + for (k, m) in outMessages.pairs(): + check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window + + + test "get page with index that is not in the DB": + let maxPageSize = 3'u64 + + let nonStoredMsg = WakuMessage(payload: @[byte 13], contentTopic: "hello", version: uint32(3), timestamp: getNanosecondTime(epochTime())) + var nonStoredIndex = computeIndex(nonStoredMsg) + + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: nonStoredIndex, + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected + cursor: Index(), + direction: PagingDirection.FORWARD) + check: + outError == HistoryResponseError.INVALID_CURSOR + outMessages.len == 0 # only two messages are in the time window going through the filter + outPagingInfo == expectedOutPagingInfo + + test "ask for last index, forward": + let maxPageSize = 3'u64 + + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[8], + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the last index + cursor: Index(), # empty index, because we went beyond the last index + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid? + outMessages.len == 0 # no message expected, because we already have the last index + outPagingInfo == expectedOutPagingInfo + + test "ask for first index, backward": + let maxPageSize = 3'u64 + + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[0], + direction: PagingDirection.BACKWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the first index (and go trough backwards) + cursor: Index(), # empty index, because we went beyond the first index (backwards) + direction: PagingDirection.BACKWARD) + + check: + outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid? + outMessages.len == 0 # no message expected, because we already have the first index + outPagingInfo == expectedOutPagingInfo + + + + test "valid index but no predicate matches": + proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = + return false + + let maxPageSize = 3'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + cursor: indexes[1], + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 0, + cursor: indexes[8], # last index; DB was searched until the end (no message matched the predicate) + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 0 # predicate is false for each message + outPagingInfo == expectedOutPagingInfo + + test "sparse filter (-> merging several DB pages into one reply page)": + proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = + if indMsg.msg.payload[0] mod 4 == 0: + return true + + let maxPageSize = 2'u64 + let pagingInfo = PagingInfo(pageSize: maxPageSize, + # we don't set an index here, starting from the beginning. This also covers the inital special case of the retrieval loop. + direction: PagingDirection.FORWARD) + let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() + + let expectedOutPagingInfo = PagingInfo(pageSize: 2, + cursor: indexes[7], + direction: PagingDirection.FORWARD) + + check: + outError == HistoryResponseError.NONE + outMessages.len == 2 + outPagingInfo == expectedOutPagingInfo + outMessages[0] == msgs[3] + outMessages[1] == msgs[7] + +test "Message Store: Retention Time": # TODO: better retention time test coverage + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database, isSqliteOnly=true, retentionTime=100)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + + t1 = getNanosecondTime(epochTime())-300_000_000_000 + t2 = getNanosecondTime(epochTime())-200_000_000_000 + t3 = getNanosecondTime(epochTime())-100_000_000_000 + t4 = getNanosecondTime(epochTime())-50_000_000_000 + t5 = getNanosecondTime(epochTime())-40_000_000_000 + t6 = getNanosecondTime(epochTime())-3_000_000_000 + t7 = getNanosecondTime(epochTime())-2_000_000_000 + t8 = getNanosecondTime(epochTime())-1_000_000_000 + t9 = getNanosecondTime(epochTime()) + + var msgs = @[ + WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1), + WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2), + WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3), + WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4), + WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5), + WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6), + WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7), + WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8), + WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9), + ] + + var indexes: seq[Index] = @[] + for msg in msgs: + var index = computeIndex(msg, receivedTime = msg.timestamp) + let output = store.put(index, msg, pubsubTopic) + check output.isOk + indexes.add(index) + + # let maxPageSize = 9'u64 + # let pagingInfo = PagingInfo(pageSize: maxPageSize, + # direction: PagingDirection.FORWARD) + # let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() + + # let expectedOutPagingInfo = PagingInfo(pageSize: 7, + # cursor: indexes[8], + # direction: PagingDirection.FORWARD) + + # check: + # outError == HistoryResponseError.NONE + # outMessages.len == 7 + # outPagingInfo == expectedOutPagingInfo + + # for (k, m) in outMessages.pairs(): + # check: msgs[k+2] == m # offset of two because the frist two messages got deleted + + # store.close() + + diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 12c515690..d94bf5a41 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -33,7 +33,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -78,7 +80,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -125,7 +129,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) pubsubtopic1 = "queried topic" pubsubtopic2 = "non queried topic" # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) @@ -173,7 +179,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) pubsubtopic1 = "queried topic" pubsubtopic2 = "non queried topic" # this query targets: pubsubtopic1 @@ -219,7 +227,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) pubsubtopic = "queried topic" # this query targets: pubsubtopic rpc = HistoryQuery(pubsubTopic: pubsubtopic) @@ -343,7 +353,9 @@ procSuite "Waku Store": await listenSwitch.start() let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -395,7 +407,10 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() - let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -446,7 +461,10 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() - let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -606,7 +624,10 @@ procSuite "Waku Store": var listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() - let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) @@ -626,7 +647,10 @@ procSuite "Waku Store": var listenSwitch2 = newStandardSwitch(some(key2)) await listenSwitch2.start() - let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) + let + database2 = SqliteDatabase.init("", inMemory = true)[] + store2 = WakuMessageStore.init(database2)[] + proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng(), store2) proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo()) @@ -689,13 +713,17 @@ procSuite "Waku Store": # starts a new node var dialSwitch3 = newStandardSwitch() await dialSwitch3.start() - - let proto3 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) + + let + database3 = SqliteDatabase.init("", inMemory = true)[] + store3 = WakuMessageStore.init(database3)[] + proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3) + proto3.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) let successResult = await proto3.resume() check: - successResult.isOk + successResult.isOk successResult.value == 10 proto3.messages.len == 10 @@ -758,7 +786,11 @@ procSuite "Waku Store": # starts a new node var dialSwitch3 = newStandardSwitch() await dialSwitch3.start() - let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng()) + + let + database3 = SqliteDatabase.init("", inMemory = true)[] + store3 = WakuMessageStore.init(database3)[] + proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3) let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(), listenSwitch.peerInfo.toRemotePeerInfo(), @@ -796,7 +828,7 @@ procSuite "Waku Store": check: store.messages.len == capacity # Store is at capacity - + # Test that capacity holds await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1))) diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index ce2758a95..3814f0c6c 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -160,6 +160,16 @@ type defaultValue: 50000 name: "store-capacity" }: int + sqliteStore* {. + desc: "Enable sqlite-only store: true|false", + defaultValue: false + name: "sqlite-store" }: bool + + sqliteRetentionTime* {. + desc: "time the sqlite-only store keeps messages (in seconds)", + defaultValue: 30.days.seconds + name: "sqlite-retention-time" }: int64 # TODO: Duration + ## Filter config filter* {. diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index a0fe359ea..0e650c003 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -4,6 +4,7 @@ import std/options, stew/results, ../../../protocol/waku_message, + ../../../protocol/waku_store/waku_store_types, ../../../utils/time, ../../../utils/pagination @@ -21,4 +22,6 @@ type # MessageStore interface method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard +method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard +method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 072c781c3..91bc57cea 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -1,13 +1,15 @@ {.push raises: [Defect].} import - std/[options, tables], + std/[options, tables, times], sqlite3_abi, stew/[byteutils, results], chronicles, + chronos, ./message_store, ../sqlite, ../../../protocol/waku_message, + ../../../protocol/waku_store/waku_store, ../../../utils/pagination, ../../../utils/time @@ -37,6 +39,9 @@ type storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. storeMaxLoad: int # = storeCapacity * MaxStoreOverflow deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs + isSqliteOnly: bool + retentionTime: chronos.Duration + oldestReceiverTimestamp: int64 insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void] proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] = @@ -49,6 +54,36 @@ proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] = return err("failed to count number of messages in DB") ok(numMessages) + +proc getOldestDbReceiverTimestamp(db: SqliteDatabase): MessageStoreResult[int64] = + var oldestReceiverTimestamp: int64 + proc handler(s: ptr sqlite3_stmt) = + oldestReceiverTimestamp = column_timestamp(s, 0) + let query = "SELECT MIN(receiverTimestamp) FROM " & TABLE_TITLE; + let queryRes = db.query(query, handler) + if queryRes.isErr: + return err("failed to get the oldest receiver timestamp from the DB") + ok(oldestReceiverTimestamp) + +proc deleteOldestTime(db: WakuMessageStore): MessageStoreResult[void] = + # delete if there are messages in the DB that exceed the retention time by 10% and more (batch delete for efficiency) + let retentionTimestamp = getNanosecondTime(getTime().toUnixFloat()) - db.retentionTime.nanoseconds + let thresholdTimestamp = retentionTimestamp - db.retentionTime.nanoseconds div 10 + if thresholdTimestamp <= db.oldestReceiverTimestamp or db.oldestReceiverTimestamp == 0: return ok() + + var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " & + "WHERE receiverTimestamp < " & $retentionTimestamp + + let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard) + if res.isErr: + return err(res.error) + + info "Messages exceeding retention time deleted from DB. ", retentionTime=db.retentionTime + + db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query works") + + ok() + proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] = var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " & "WHERE id NOT IN " & @@ -72,8 +107,8 @@ proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] = ok() -proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] = - +proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000, isSqliteOnly = false, retentionTime = chronos.days(30).seconds): MessageStoreResult[T] = + let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration ## Table Creation let createStmt = db.prepareStmt(""" @@ -119,18 +154,28 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50 let storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow) deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2) - + + let wms = WakuMessageStore(database: db, numMessages: int(numMessages), storeCapacity: storeCapacity, storeMaxLoad: storeMaxLoad, deleteWindow: deleteWindow, + isSqliteOnly: isSqliteOnly, + retentionTime: retentionTime, + oldestReceiverTimestamp: db.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works."), insertStmt: insertStmt) - # If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object - if wms.numMessages >= wms.storeMaxLoad: + # if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object + if not isSqliteOnly and wms.numMessages >= wms.storeMaxLoad: let res = wms.deleteOldest() - if res.isErr: return err("deleting oldest messages failed") + if res.isErr: return err("deleting oldest messages failed: " & res.error()) + + # if using the sqlite-only store, delete messages exceeding the retention time + if isSqliteOnly: + debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp + let res = wms.deleteOldestTime() + if res.isErr: return err("deleting oldest messages (time) failed: " & res.error()) ok(wms) @@ -151,10 +196,20 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop return err("failed") db.numMessages += 1 - if db.numMessages >= db.storeMaxLoad: + # if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages + if not db.isSqliteOnly and db.numMessages >= db.storeMaxLoad: let res = db.deleteOldest() if res.isErr: return err("deleting oldest failed") + if db.isSqliteOnly: + # TODO: move to a timer job + # For this experimental version of the new store, it is OK to delete here, because it only actually triggers the deletion if there is a batch of messages older than the threshold. + # This only adds a few simple compare operations, if deletion is not necessary. + # Still, the put that triggers the deletion might return with a significant delay. + if db.oldestReceiverTimestamp == 0: db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works.") + let res = db.deleteOldestTime() + if res.isErr: return err("deleting oldest failed") + ok() method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = @@ -211,6 +266,157 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto ok gotMessages +proc adjustDbPageSize(dbPageSize: uint64, matchCount: uint64, returnPageSize: uint64): uint64 {.inline.} = + var ret = if matchCount < 2: dbPageSize * returnPageSize + else: dbPageSize * (returnPageSize div matchCount) + trace "dbPageSize adjusted to: ", ret + ret + + +method getPage*(db: WakuMessageStore, + pred: QueryFilterMatcher, + pagingInfo: PagingInfo): + MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] = + ## Get a single page of history matching the predicate and + ## adhering to the pagingInfo parameters + + trace "getting page from SQLite DB", pagingInfo=pagingInfo + + let + responsePageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos + else: pagingInfo.pageSize + + var dbPageSize = responsePageSize # we retrieve larger pages from the DB for queries with (sparse) filters (TODO: improve adaptive dbPageSize increase) + + var cursor = pagingInfo.cursor + + var messages: seq[WakuMessage] + var + lastIndex: Index + numRecordsVisitedPage: uint64 = 0 # number of DB records visited during retrieving the last page from the DB + numRecordsVisitedTotal: uint64 = 0 # number of DB records visited in total + numRecordsMatchingPred: uint64 = 0 # number of records that matched the predicate on the last DB page; we use this as to gauge the sparseness of rows matching the filter. + + proc msg(s: ptr sqlite3_stmt) = # this is the actual onData proc that is passed to the query proc (the message store adds one indirection) + if uint64(messages.len) >= responsePageSize: return + let + receiverTimestamp = column_timestamp(s, 0) + + topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) + topicLength = sqlite3_column_bytes(s,1) + contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))) + + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) + length = sqlite3_column_bytes(s, 2) + payload = @(toOpenArray(p, 0, length-1)) + + pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) + pubsubTopicLength = sqlite3_column_bytes(s,3) + pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) + + version = sqlite3_column_int64(s, 4) + + senderTimestamp = column_timestamp(s, 5) + retMsg = WakuMessage(contentTopic: contentTopic, payload: payload, version: uint32(version), timestamp: Timestamp(senderTimestamp)) + # TODO: we should consolidate WakuMessage, Index, and IndexedWakuMessage; reason: avoid unnecessary copying and recalculation + index = retMsg.computeIndex(receiverTimestamp, pubsubTopic) # TODO: retrieve digest from DB + indexedWakuMsg = IndexedWakuMessage(msg: retMsg, index: index, pubsubTopic: pubsubTopic) # TODO: constructing indexedWakuMsg requires unnecessary copying + + lastIndex = index + numRecordsVisitedPage += 1 + try: + if pred(indexedWakuMsg): #TODO throws unknown exception + numRecordsMatchingPred += 1 + messages.add(retMsg) + except: + # TODO properly handle this exception + quit 1 + + # TODO: deduplicate / condense the following 4 DB query strings + # If no index has been set in pagingInfo, start with the first message (or the last in case of backwards direction) + if cursor == Index(): ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! + let noCursorQuery = + if pagingInfo.direction == PagingDirection.FORWARD: + "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & + "FROM " & TABLE_TITLE & " " & + "ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " & + "LIMIT " & $dbPageSize & ";" + else: + "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & + "FROM " & TABLE_TITLE & " " & + "ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " & + "LIMIT " & $dbPageSize & ";" + + let res = db.database.query(noCursorQuery, msg) + if res.isErr: + return err("failed to execute SQLite query: noCursorQuery") + numRecordsVisitedTotal = numRecordsVisitedPage + numRecordsVisitedPage = 0 + dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize) + numRecordsMatchingPred = 0 + cursor = lastIndex + + let preparedPageQuery = if pagingInfo.direction == PagingDirection.FORWARD: + db.database.prepareStmt( + "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & + "FROM " & TABLE_TITLE & " " & + "WHERE (senderTimestamp, id, pubsubTopic) > (?, ?, ?) " & + "ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " & + "LIMIT ?;", + (Timestamp, seq[byte], seq[byte], int64), # TODO: uint64 not supported yet + (Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), + ).expect("this is a valid statement") + else: + db.database.prepareStmt( + "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & + "FROM " & TABLE_TITLE & " " & + "WHERE (senderTimestamp, id, pubsubTopic) < (?, ?, ?) " & + "ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " & + "LIMIT ?;", + (Timestamp, seq[byte], seq[byte], int64), + (Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), + ).expect("this is a valid statement") + + + # TODO: DoS attack mitigation against: sending a lot of queries with sparse (or non-matching) filters making the store node run through the whole DB. Even worse with pageSize = 1. + while uint64(messages.len) < responsePageSize: + let res = preparedPageQuery.exec((cursor.senderTime, @(cursor.digest.data), cursor.pubsubTopic.toBytes(), dbPageSize.int64), msg) # TODO support uint64, pages large enough to cause an overflow are not expected... + if res.isErr: + return err("failed to execute SQLite prepared statement: preparedPageQuery") + numRecordsVisitedTotal += numRecordsVisitedPage + if numRecordsVisitedPage == 0: break # we are at the end of the DB (find more efficient/integrated solution to track that event) + numRecordsVisitedPage = 0 + cursor = lastIndex + dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize) + numRecordsMatchingPred = 0 + + let outPagingInfo = PagingInfo(pageSize: messages.len.uint, + cursor: lastIndex, + direction: pagingInfo.direction) + + let historyResponseError = if numRecordsVisitedTotal == 0: HistoryResponseError.INVALID_CURSOR # Index is not in DB (also if queried Index points to last entry) + else: HistoryResponseError.NONE + + preparedPageQuery.dispose() + + return ok((messages, outPagingInfo, historyResponseError)) # TODO: historyResponseError is not a "real error": treat as a real error + + + + +method getPage*(db: WakuMessageStore, + pagingInfo: PagingInfo): + MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] = + ## Get a single page of history without filtering. + ## Adhere to the pagingInfo parameters + + proc predicate(i: IndexedWakuMessage): bool = true # no filtering + + return getPage(db, predicate, pagingInfo) + + + + proc close*(db: WakuMessageStore) = ## Closes the database. db.insertStmt.dispose() diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index 48d924aec..acc7d63a1 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -172,8 +172,53 @@ proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] = res +template readResult(s: RawStmtPtr, column: cint, T: type): auto = + when T is Option: + if sqlite3_column_type(s, column) == SQLITE_NULL: + none(typeof(default(T).get())) + else: + some(readSimpleResult(s, column, typeof(default(T).get()))) + else: + readSimpleResult(s, column, T) + +template readResult(s: RawStmtPtr, T: type): auto = + when T is tuple: + var res: T + var i = cint 0 + for field in fields(res): + field = readResult(s, i, typeof(field)) + inc i + res + else: + readResult(s, 0.cint, T) + type - DataProc* = proc(s: ptr sqlite3_stmt) {.closure.} + DataProc* = proc(s: ptr sqlite3_stmt) {.closure.} # the nim-eth definition is different; one more indirection + +proc exec*[Params, Res](s: SqliteStmt[Params, Res], + params: Params, + onData: DataProc): DatabaseResult[bool] = + let s = RawStmtPtr s + bindParams(s, params) + + try: + var gotResults = false + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onData(s) + gotResults = true + of SQLITE_DONE: + break + else: + return err($sqlite3_errstr(v)) + return ok gotResults + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] = var s = prepare(db.env, query): discard @@ -295,4 +340,4 @@ proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration return err("failed to set the new user_version") debug "user_version is set to", targetVersion=targetVersion - ok(true) \ No newline at end of file + ok(true) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 3dc6539d3..a26dfc57d 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -483,15 +483,15 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.ra # NYI - Do we need this? #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) -proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity) {.raises: [Defect, LPError].} = +proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} = info "mounting store" if node.wakuSwap.isNil: debug "mounting store without swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly) else: debug "mounting store with swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly) node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) @@ -1034,7 +1034,7 @@ when isMainModule: if conf.persistMessages: # Historical message persistence enable. Set up Message table in storage - let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity) + let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity, conf.sqliteStore, conf.sqliteRetentionTime) if res.isErr: warn "failed to init WakuMessageStore", err = res.error @@ -1237,7 +1237,7 @@ when isMainModule: # Store setup if (conf.storenode != "") or (conf.store): - mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity) + mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteStore) if conf.storenode != "": setStorePeer(node, conf.storenode) diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 6fab7d409..bedab39d1 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -34,7 +34,8 @@ export bearssl, minprotobuf, peer_manager, - waku_store_types + waku_store_types, + message_store declarePublicGauge waku_store_messages, "number of historical messages", ["type"] declarePublicGauge waku_store_peers, "number of store peers" @@ -56,6 +57,18 @@ const # TODO Move serialization function to separate file, too noisy # TODO Move pagination to separate file, self-contained logic +type + WakuStore* = ref object of LPProtocol + peerManager*: PeerManager + rng*: ref BrHmacDrbgContext + messages*: StoreQueueRef # in-memory message store + store*: MessageStore # sqlite DB handle + wakuSwap*: WakuSwap + persistMessages*: bool + #TODO: WakuMessageStore currenly also holds isSqliteOnly; put it in single place. + isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB + + proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat()), pubsubTopic = DefaultTopic): Index = @@ -316,7 +329,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} let # Read a page of history matching the query - (wakuMsgList, updatedPagingInfo, error) = w.messages.getPage(matchesQuery, query.pagingInfo) + (wakuMsgList, updatedPagingInfo, error) = + if w.isSqliteOnly: w.store.getPage(matchesQuery, query.pagingInfo).expect("should return a valid result set") # TODO: error handling + else: w.messages.getPage(matchesQuery, query.pagingInfo) + # Build response historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) @@ -363,6 +379,10 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = if ws.store.isNil: return + if ws.isSqliteOnly: + info "SQLite-only store initialized. Messages are *not* loaded into memory." + return + proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) = # TODO index should not be recalculated discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic)) @@ -382,9 +402,9 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, - capacity = DefaultStoreCapacity): T = + capacity = DefaultStoreCapacity, isSqliteOnly = false): T = debug "init" - var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages) + var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) output.init(capacity) return output @@ -398,18 +418,21 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = # Store is mounted but new messages should not be stored return - # Handle WakuMessage according to store protocol - trace "handle message in WakuStore", topic=topic, msg=msg - let index = msg.computeIndex(pubsubTopic = topic) - let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) + + # add message to in-memory store + if not w.isSqliteOnly: + # Handle WakuMessage according to store protocol + trace "handle message in WakuStore", topic=topic, msg=msg + + let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - if addRes.isErr: - trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() - waku_store_errors.inc(labelValues = [$(addRes.error())]) - return # Do not attempt to store in persistent DB + if addRes.isErr: + trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() + waku_store_errors.inc(labelValues = [$(addRes.error())]) + return # Do not attempt to store in persistent DB - waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) + waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) if w.store.isNil: return diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 6f4bea68d..f90bb27a5 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -11,7 +11,6 @@ import libp2p/protocols/protocol, stew/[results, sorted_set], # internal imports - ../../node/storage/message/message_store, ../../utils/pagination, ../../utils/time, ../../node/peer_manager/peer_manager, @@ -24,7 +23,6 @@ export results, peer_manager, waku_swap_types, - message_store, waku_message, pagination @@ -67,6 +65,11 @@ type cursor*: Index direction*: PagingDirection + HistoryResponseError* {.pure.} = enum + ## HistoryResponseError contains error message to inform the querying node about the state of its request + NONE = uint32(0) + INVALID_CURSOR = uint32(1) + HistoryQuery* = object contentFilters*: seq[HistoryContentFilter] pubsubTopic*: string @@ -74,11 +77,6 @@ type startTime*: Timestamp # used for time-window query endTime*: Timestamp # used for time-window query - HistoryResponseError* {.pure.} = enum - ## HistoryResponseError contains error message to inform the querying node about the state of its request - NONE = uint32(0) - INVALID_CURSOR = uint32(1) - HistoryResponse* = object messages*: seq[WakuMessage] pagingInfo*: PagingInfo # used for pagination @@ -109,13 +107,6 @@ type StoreQueueResult*[T] = Result[T, cstring] - WakuStore* = ref object of LPProtocol - peerManager*: PeerManager - rng*: ref BrHmacDrbgContext - messages*: StoreQueueRef # in-memory message store - store*: MessageStore # sqlite DB handle - wakuSwap*: WakuSwap - persistMessages*: bool ###################### # StoreQueue helpers # @@ -423,9 +414,9 @@ proc getPage*(storeQueue: StoreQueueRef, else: pagingInfo.pageSize case pagingInfo.direction - of FORWARD: + of PagingDirection.FORWARD: return storeQueue.fwdPage(pred, maxPageSize, cursorOpt) - of BACKWARD: + of PagingDirection.BACKWARD: return storeQueue.bwdPage(pred, maxPageSize, cursorOpt) proc getPage*(storeQueue: StoreQueueRef,