diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 40ec52981..68892b3c0 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -11,7 +11,8 @@ import ./v2/test_utils_pagination, ./v2/test_message_store_queue, ./v2/test_message_store_queue_pagination, - ./v2/test_message_store, + ./v2/test_message_store_sqlite_query, + ./v2/test_message_store_sqlite, ./v2/test_jsonrpc_waku, ./v2/test_rest_serdes, ./v2/test_rest_debug_api_serdes, diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim deleted file mode 100644 index e3d8ffbd2..000000000 --- a/tests/v2/test_message_store.nim +++ /dev/null @@ -1,562 +0,0 @@ -{.used.} - -import - std/[unittest, options, tables, sets, times, os, strutils], - chronicles, - chronos, - sqlite3_abi, - stew/byteutils, - ../../waku/v2/node/storage/message/waku_message_store, - ../../waku/v2/node/storage/message/waku_store_queue, - ../../waku/v2/node/storage/sqlite, - ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store, - ../../waku/v2/utils/time, - ../../waku/v2/utils/pagination, - ./utils - - -suite "Message Store": - test "set and get works": - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - topic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - - t1 = getNanosecondTime(epochTime()) - t2 = getNanosecondTime(epochTime()) - t3 = high(int64) - var msgs = @[ - WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), - WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), - WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3), - ] - - defer: store.close() - - var indexes: seq[Index] = @[] - for msg in msgs: - var index = computeIndex(msg) - let output = store.put(index, msg, pubsubTopic) - check output.isOk - indexes.add(index) - - - # flags for version - var v0Flag, v1Flag, vMaxFlag: bool = false - # flags for sender timestamp - var t1Flag, t2Flag, t3Flag: bool = false - # flags for receiver timestamp - var rt1Flag, rt2Flag, rt3Flag: bool = false - # flags for message/pubsubTopic (default true) - var msgFlag, psTopicFlag = true - - var responseCount = 0 - proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = - responseCount += 1 - - # Note: cannot use `check` within `{.raises: [Defect].}` block: - # @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception - if msg notin msgs: - msgFlag = false - - if psTopic != pubsubTopic: - psTopicFlag = false - - # check the correct retrieval of versions - if msg.version == uint32(0): v0Flag = true - if msg.version == uint32(1): v1Flag = true - # high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage - if msg.version == high(uint32): vMaxFlag = true - - # check correct retrieval of sender timestamps - if msg.timestamp == t1: t1Flag = true - if msg.timestamp == t2: t2Flag = true - if msg.timestamp == t3: t3Flag = true - - # check correct retrieval of receiver timestamps - if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true - if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true - if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true - - - let res = store.getAll(data) - - check: - res.isErr == false - responseCount == 3 - # check version - v0Flag == true - v1Flag == true - vMaxFlag == true - # check sender timestamp - t1Flag == true - t2Flag == true - t3Flag == true - # check receiver timestamp - rt1Flag == true - rt2Flag == true - rt3Flag == true - # check messages and pubsubTopic - msgFlag == true - psTopicFlag == true - test "set and get user version": - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - defer: store.close() - - let res = database.setUserVersion(5) - check res.isErr == false - - let ver = database.getUserVersion() - check: - ver.isErr == false - ver.value == 5 - test "migration": - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - defer: store.close() - - template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] - let migrationPath = sourceDir - - let res = database.migrate(migrationPath, 10) - check: - res.isErr == false - - let ver = database.getUserVersion() - check: - ver.isErr == false - ver.value == 10 - - test "number of messages retrieved by getAll is bounded by storeCapacity": - let - database = SqliteDatabase.init("", inMemory = true)[] - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - capacity = 10 - store = WakuMessageStore.init(database, capacity)[] - - defer: store.close() - - for i in 1..capacity: - let - msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - index = computeIndex(msg) - output = store.put(index, msg, pubsubTopic) - check output.isOk - - var - responseCount = 0 - lastMessageTimestamp = Timestamp(0) - - proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = - responseCount += 1 - lastMessageTimestamp = msg.timestamp - - # Test limited getAll function when store is at capacity - let resMax = store.getAll(data) - - check: - resMax.isOk - responseCount == capacity # We retrieved all items - lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution - - test "DB store capacity": - let - database = SqliteDatabase.init("", inMemory = true)[] - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - capacity = 100 - overload = 65 - store = WakuMessageStore.init(database, capacity)[] - - defer: store.close() - - for i in 1..capacity+overload: - let - msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - index = computeIndex(msg) - output = store.put(index, msg, pubsubTopic) - check output.isOk - - # count messages in DB - var numMessages: int64 - proc handler(s: ptr sqlite3_stmt) = - numMessages = sqlite3_column_int64(s, 0) - let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store - discard store.database.query(countQuery, handler) - - check: - # expected number of messages is 120 because - # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) - # the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store - numMessages == 120 - - -suite "Message Store: Retrieve Pages": - setup: - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - - t1 = getNanosecondTime(epochTime())-800 - t2 = getNanosecondTime(epochTime())-700 - t3 = getNanosecondTime(epochTime())-600 - t4 = getNanosecondTime(epochTime())-500 - t5 = getNanosecondTime(epochTime())-400 - t6 = getNanosecondTime(epochTime())-300 - t7 = getNanosecondTime(epochTime())-200 - t8 = getNanosecondTime(epochTime())-100 - t9 = getNanosecondTime(epochTime()) - - var msgs = @[ - WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1), - WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2), - WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3), - WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4), - WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5), - WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6), - WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7), - WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8), - WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9), - ] - - var indexes: seq[Index] = @[] - for msg in msgs: - var index = computeIndex(msg) - let output = store.put(index, msg, pubsubTopic) - check output.isOk - indexes.add(index) - - teardown: - store.close() - - test "get forward page": - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[1], - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 3, - cursor: indexes[4], - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 3 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[k+2] == m # offset of two because we used the second message (indexes[1]) as cursor; the cursor is excluded in the returned page - - test "get backward page": - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[4], - direction: PagingDirection.BACKWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 3, - cursor: indexes[1], - direction: PagingDirection.BACKWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 3 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[^(k+6)] == m - - - test "get forward page (default index)": - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - # we don't set an index here to test the default index - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 3, - cursor: indexes[2], - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 3 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[k] == m - - - test "get backward page (default index)": - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - # we don't set an index here to test the default index - direction: PagingDirection.BACKWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 3, - cursor: indexes[6], - direction: PagingDirection.BACKWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 3 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[^(k+1)] == m - - - test "get large forward page": - let maxPageSize = 12'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - # we don't set an index here; start at the beginning - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 9, # there are only 10 msgs in total in the DB - cursor: indexes[8], - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 9 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[k] == m - - - test "get large backward page": - let maxPageSize = 12'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - # we don't set an index here to test the default index - direction: PagingDirection.BACKWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 9, - cursor: indexes[0], - direction: PagingDirection.BACKWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 9 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[^(k+1)] == m - - - test "get filtered page, maxPageSize == number of matching messages": - proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = - # filter on timestamp - if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t6: - return false - - return true - - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[1], - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 3, - cursor: indexes[5], - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 3 - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window - - test "get filtered page, maxPageSize > number of matching messages": - proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = - # filter on timestamp - if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t5: - return false - return true - - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[1], - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 2, - cursor: indexes[8], # index is advanced by one because one message was not accepted by the filter - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 2 # only two messages are in the time window going through the filter - outPagingInfo == expectedOutPagingInfo - - for (k, m) in outMessages.pairs(): - check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window - - - test "get page with index that is not in the DB": - let maxPageSize = 3'u64 - - let nonStoredMsg = WakuMessage(payload: @[byte 13], contentTopic: "hello", version: uint32(3), timestamp: getNanosecondTime(epochTime())) - var nonStoredIndex = computeIndex(nonStoredMsg) - - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: nonStoredIndex, - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected - cursor: Index(), - direction: PagingDirection.FORWARD) - check: - outError == HistoryResponseError.INVALID_CURSOR - outMessages.len == 0 # only two messages are in the time window going through the filter - outPagingInfo == expectedOutPagingInfo - - test "ask for last index, forward": - let maxPageSize = 3'u64 - - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[8], - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the last index - cursor: Index(), # empty index, because we went beyond the last index - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid? - outMessages.len == 0 # no message expected, because we already have the last index - outPagingInfo == expectedOutPagingInfo - - test "ask for first index, backward": - let maxPageSize = 3'u64 - - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[0], - direction: PagingDirection.BACKWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the first index (and go trough backwards) - cursor: Index(), # empty index, because we went beyond the first index (backwards) - direction: PagingDirection.BACKWARD) - - check: - outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid? - outMessages.len == 0 # no message expected, because we already have the first index - outPagingInfo == expectedOutPagingInfo - - - - test "valid index but no predicate matches": - proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = - return false - - let maxPageSize = 3'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - cursor: indexes[1], - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 0, - cursor: indexes[8], # last index; DB was searched until the end (no message matched the predicate) - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 0 # predicate is false for each message - outPagingInfo == expectedOutPagingInfo - - test "sparse filter (-> merging several DB pages into one reply page)": - proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} = - if indMsg.msg.payload[0] mod 4 == 0: - return true - - let maxPageSize = 2'u64 - let pagingInfo = PagingInfo(pageSize: maxPageSize, - # we don't set an index here, starting from the beginning. This also covers the inital special case of the retrieval loop. - direction: PagingDirection.FORWARD) - let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get() - - let expectedOutPagingInfo = PagingInfo(pageSize: 2, - cursor: indexes[7], - direction: PagingDirection.FORWARD) - - check: - outError == HistoryResponseError.NONE - outMessages.len == 2 - outPagingInfo == expectedOutPagingInfo - outMessages[0] == msgs[3] - outMessages[1] == msgs[7] - -test "Message Store: Retention Time": # TODO: better retention time test coverage - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database, isSqliteOnly=true, retentionTime=100)[] - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - - t1 = getNanosecondTime(epochTime())-300_000_000_000 - t2 = getNanosecondTime(epochTime())-200_000_000_000 - t3 = getNanosecondTime(epochTime())-100_000_000_000 - t4 = getNanosecondTime(epochTime())-50_000_000_000 - t5 = getNanosecondTime(epochTime())-40_000_000_000 - t6 = getNanosecondTime(epochTime())-3_000_000_000 - t7 = getNanosecondTime(epochTime())-2_000_000_000 - t8 = getNanosecondTime(epochTime())-1_000_000_000 - t9 = getNanosecondTime(epochTime()) - - var msgs = @[ - WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1), - WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2), - WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3), - WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4), - WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5), - WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6), - WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7), - WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8), - WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9), - ] - - var indexes: seq[Index] = @[] - for msg in msgs: - var index = computeIndex(msg, receivedTime = msg.timestamp) - let output = store.put(index, msg, pubsubTopic) - check output.isOk - indexes.add(index) - - # let maxPageSize = 9'u64 - # let pagingInfo = PagingInfo(pageSize: maxPageSize, - # direction: PagingDirection.FORWARD) - # let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get() - - # let expectedOutPagingInfo = PagingInfo(pageSize: 7, - # cursor: indexes[8], - # direction: PagingDirection.FORWARD) - - # check: - # outError == HistoryResponseError.NONE - # outMessages.len == 7 - # outPagingInfo == expectedOutPagingInfo - - # for (k, m) in outMessages.pairs(): - # check: msgs[k+2] == m # offset of two because the frist two messages got deleted - - # store.close() - - diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index 851564e1f..5ace2ca3f 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -14,8 +14,8 @@ import const - DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto" - DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto") + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") proc getTestStoreQueue(numMessages: int): StoreQueueRef = @@ -41,7 +41,6 @@ proc getTestTimestamp(): Timestamp = let now = getNanosecondTime(epochTime()) Timestamp(now) - suite "Queue store - pagination": test "Forward pagination test": var @@ -123,7 +122,7 @@ suite "Queue store - pagination": error == HistoryResponseError.NONE # test for an invalid cursor - let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DEFAULT_PUBSUB_TOPIC) + let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD) (data, newPagingInfo, error) = getPage(stQ, pagingInfo) check: @@ -234,7 +233,7 @@ suite "Queue store - pagination": error == HistoryResponseError.NONE # test for an invalid cursor - let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DEFAULT_PUBSUB_TOPIC) + let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD) (data, newPagingInfo, error) = getPage(stQ, pagingInfo) check: diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim new file mode 100644 index 000000000..b788e5410 --- /dev/null +++ b/tests/v2/test_message_store_sqlite.nim @@ -0,0 +1,329 @@ +{.used.} + +import + std/[unittest, options, tables, sets, times, strutils, sequtils, os], + stew/byteutils, + chronos, + chronicles, + sqlite3_abi +import + ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/utils/time, + ../../waku/v2/utils/pagination, + ./utils + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.init("", inMemory = true).tryGet() + +proc getTestTimestamp(offset=0): Timestamp = + Timestamp(getNanosecondTime(epochTime())) + +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = getNanosecondTime(epochTime()) +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + + +suite "SQLite message store - init store": + test "init store": + ## Given + const storeCapacity = 20 + const retentionTime = days(20).seconds + + let database = newTestDatabase() + + ## When + let resStore = WakuMessageStore.init(database, capacity=storeCapacity, retentionTime=retentionTime) + + ## Then + check: + resStore.isOk() + + let store = resStore.tryGet() + check: + not store.isNil() + + ## Teardown + store.close() + + test "init store with prepopulated database with messages older than retention policy": + # TODO: Implement initialization test cases + discard + + test "init store with prepopulated database with messsage count greater than max capacity": + # TODO: Implement initialization test cases + discard + + +# TODO: Add test cases to cover the store retention time fucntionality +suite "SQLite message store - insert messages": + test "insert a message": + ## Given + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database).tryGet() + + let message = fakeWakuMessage(contentTopic=contentTopic) + let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic) + + ## When + let resPut = store.put(messageIndex, message, DefaultPubsubTopic) + + ## Then + check: + resPut.isOk() + + let storedMsg = store.getAllMessages().tryGet() + check: + storedMsg.len == 1 + storedMsg.all do (item: auto) -> bool: + let (_, msg, pubsubTopic) = item + msg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic + + ## Teardown + store.close() + + test "store capacity should be limited": + ## Given + const storeCapacity = 5 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6) + ] + + ## When + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## Then + let storedMsg = store.getAllMessages().tryGet() + check: + storedMsg.len == storeCapacity + storedMsg.all do (item: auto) -> bool: + let (_, msg, pubsubTopic) = item + msg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic + + ## Teardown + store.close() + + +# TODO: Review the following suite test cases +suite "Message Store": + test "set and get works": + ## Given + let + database = newTestDatabase() + store = WakuMessageStore.init(database).get() + topic = DefaultContentTopic + pubsubTopic = DefaultPubsubTopic + + t1 = getTestTimestamp(0) + t2 = getTestTimestamp(1) + t3 = high(int64) + + var msgs = @[ + WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), + WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), + # high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage + WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3), + ] + + var indexes: seq[Index] = @[] + for msg in msgs: + var index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, pubsubTopic) + require resPut.isOk + indexes.add(index) + + ## When + let res = store.getAllMessages() + + ## Then + check: + res.isOk() + + let result = res.value + check: + result.len == 3 + + # flags for version + var v0Flag, v1Flag, vMaxFlag: bool = false + # flags for sender timestamp + var t1Flag, t2Flag, t3Flag: bool = false + # flags for receiver timestamp + var rt1Flag, rt2Flag, rt3Flag: bool = false + + for (receiverTimestamp, msg, psTopic) in result: + # check correct retrieval of receiver timestamps + if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true + if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true + if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true + + check: + msg in msgs + + # check the correct retrieval of versions + if msg.version == uint32(0): v0Flag = true + if msg.version == uint32(1): v1Flag = true + if msg.version == high(uint32): vMaxFlag = true + + # check correct retrieval of sender timestamps + if msg.timestamp == t1: t1Flag = true + if msg.timestamp == t2: t2Flag = true + if msg.timestamp == t3: t3Flag = true + + check: + psTopic == pubSubTopic + + check: + # check version + v0Flag == true + v1Flag == true + vMaxFlag == true + # check sender timestamp + t1Flag == true + t2Flag == true + t3Flag == true + # check receiver timestamp + rt1Flag == true + rt2Flag == true + rt3Flag == true + + ## Cleanup + store.close() + + test "set and get user version": + ## Given + let + database = newTestDatabase() + store = WakuMessageStore.init(database).get() + + ## When + let resSetVersion = database.setUserVersion(5) + let resGetVersion = database.getUserVersion() + + ## Then + check: + resSetVersion.isOk() + resGetVersion.isOk() + + let version = resGetVersion.tryGet() + check: + version == 5 + + ## Cleanup + store.close() + + test "migration": + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + defer: store.close() + + template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + let migrationPath = sourceDir + + let res = database.migrate(migrationPath, 10) + check: + res.isErr == false + + let ver = database.getUserVersion() + check: + ver.isErr == false + ver.value == 10 + + test "number of messages retrieved by getAll is bounded by storeCapacity": + let + database = SqliteDatabase.init("", inMemory = true)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + capacity = 10 + store = WakuMessageStore.init(database, capacity)[] + + + for i in 1..capacity: + let + msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) + output = store.put(index, msg, pubsubTopic) + check output.isOk + + # Test limited getAll function when store is at capacity + let resMax = store.getAllMessages() + + ## THen + check: + resMax.isOk() + + let response = resMax.tryGet() + let lastMessageTimestamp = response[^1][1].timestamp + check: + response.len == capacity # We retrieved all items + lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution + + ## Cleanup + store.close() + + test "DB store capacity": + let + database = SqliteDatabase.init("", inMemory = true)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + capacity = 100 + overload = 65 + store = WakuMessageStore.init(database, capacity)[] + + defer: store.close() + + for i in 1..capacity+overload: + let + msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) + output = store.put(index, msg, pubsubTopic) + check output.isOk + + # count messages in DB + var numMessages: int64 + proc handler(s: ptr sqlite3_stmt) = + numMessages = sqlite3_column_int64(s, 0) + let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store + discard database.query(countQuery, handler) + + check: + # expected number of messages is 120 because + # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) + # the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store + numMessages == 120 \ No newline at end of file diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim new file mode 100644 index 000000000..be8c1a221 --- /dev/null +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -0,0 +1,656 @@ +{.used.} + +import + std/[options, tables, sets, times, strutils, sequtils], + stew/byteutils, + unittest2, + chronos, + chronicles, + ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/utils/time, + ../../waku/v2/utils/pagination, + ./utils + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.init("", inMemory = true).tryGet() + +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = getNanosecondTime(epochTime()) +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + + +suite "message store - history query": + + test "single content topic": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[2..3] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "single content topic and descending order": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[6..7] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "multiple content topic": + ## Given + const storeCapacity = 20 + const contentTopic1 = "test-content-topic-1" + const contentTopic2 = "test-content-topic-2" + const contentTopic3 = "test-content-topic-3" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 3), + + fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 7), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic1, contentTopic2]), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic in @[contentTopic1, contentTopic2] + filteredMessages == messages[2..3] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "content topic and pubsub topic": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages1 = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + ] + for msg in messages1: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + let messages2 = @[ + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + for msg in messages2: + let index = Index.compute(msg, msg.timestamp, pubsubTopic) + let resPut = store.put(index, msg, pubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + pubsubTopic=some(pubsubTopic), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages2[0..1] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "content topic and cursor": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[5..6] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "content topic, cursor and descending order": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[4..5] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "content topic, pubsub topic and cursor": + ## Given + const storeCapacity = 20 + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages1 = @[ + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), + fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), + ] + for msg in messages1: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + let messages2 = @[ + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), + fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), + ] + for msg in messages2: + let index = Index.compute(msg, msg.timestamp, pubsubTopic) + let resPut = store.put(index, msg, pubsubTopic) + require(resPut.isOk()) + + let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages2[0..1] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "single content topic - no results": + ## Given + const storeCapacity = 10 + const contentTopic = "test-content-topic" + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2), + fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 3), + fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 4), + fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 5), + fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 6), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 0 + pagingInfo.isNone() + + ## Teardown + store.close() + + test "single content topic and valid time range": + ## Given + let + storeCapacity = 10 + contentTopic = "test-content-topic" + timeOrigin = getNanosecondTime(epochTime()) + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), + + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30), + + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + startTime=some(timeOrigin + 5), + endTime=some(timeOrigin + 35), + maxPageSize=2, + ascendingOrder=true + ) + + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 2 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[1..2] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "single content topic and invalid time range - no results": + ## Given + let + storeCapacity = 10 + contentTopic = "test-content-topic" + timeOrigin = getNanosecondTime(epochTime()) + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10), + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + startTime=some(timeOrigin + 35), + endTime=some(timeOrigin + 10), + maxPageSize=2, + ascendingOrder=true + ) + + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 0 + pagingInfo.isNone() + + ## Teardown + store.close() + + test "single content topic and only time range start": + ## Given + let + storeCapacity = 10 + contentTopic = "test-content-topic" + timeOrigin = getNanosecondTime(epochTime()) + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10), + + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30), + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + startTime=some(timeOrigin + 15), + ascendingOrder=false + ) + + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 3 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == messages[2..4] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() + + test "single content topic, cursor and only time range start": + ## Given + let + storeCapacity = 10 + contentTopic = "test-content-topic" + timeOrigin = getNanosecondTime(epochTime()) + + let + database = newTestDatabase() + store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() + + let messages = @[ + fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), + fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10), + + fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20), + fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30), + + fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50), + ] + + for msg in messages: + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) + let resPut = store.put(index, msg, DefaultPubsubTopic) + require(resPut.isOk()) + + let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) + + ## When + let res = store.getMessagesByHistoryQuery( + contentTopic=some(@[contentTopic]), + cursor=some(cursor), + startTime=some(timeOrigin + 15), + maxPageSize=2, + ascendingOrder=true + ) + + check: + res.isOk() + + let (filteredMessages, pagingInfo) = res.tryGet() + check: + filteredMessages.len == 1 + filteredMessages.all do (msg: WakuMessage) -> bool: + msg.contentTopic == contentTopic + filteredMessages == @[messages[^1]] + + check: + pagingInfo.isSome() + + ## Teardown + store.close() diff --git a/tests/v2/test_utils_pagination.nim b/tests/v2/test_utils_pagination.nim index a2b4d04c0..1e24e3f99 100644 --- a/tests/v2/test_utils_pagination.nim +++ b/tests/v2/test_utils_pagination.nim @@ -12,8 +12,8 @@ import const - DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto" - DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto") + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") ## Helpers @@ -158,7 +158,7 @@ suite "Pagination - Index": ## When let ts2 = getTestTimestamp() + 10 - let index = Index.compute(wm, ts2, DEFAULT_CONTENT_TOPIC) + let index = Index.compute(wm, ts2, DefaultContentTopic) ## Then check: @@ -166,7 +166,7 @@ suite "Pagination - Index": index.digest.data.len == 32 # sha2 output length in bytes index.receiverTime == ts2 # the receiver timestamp should be a non-zero value index.senderTime == ts - index.pubsubTopic == DEFAULT_CONTENT_TOPIC + index.pubsubTopic == DefaultContentTopic test "Index digest of two identical messsage should be the same": ## Given @@ -178,8 +178,8 @@ suite "Pagination - Index": ## When let ts = getTestTimestamp() let - index1 = Index.compute(wm1, ts, DEFAULT_PUBSUB_TOPIC) - index2 = Index.compute(wm2, ts, DEFAULT_PUBSUB_TOPIC) + index1 = Index.compute(wm1, ts, DefaultPubsubTopic) + index2 = Index.compute(wm2, ts, DefaultPubsubTopic) ## Then check: diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index e1e1a91ec..1200a36cc 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -15,8 +15,8 @@ import const - DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto" - DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto") + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") # TODO: Extend lightpush protocol test coverage @@ -69,8 +69,8 @@ procSuite "Waku Lightpush": ## Given let - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC) - rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC) + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) + rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic) ## When let res = await proto.request(rpc) diff --git a/tests/v2/test_waku_store_rpc_codec.nim b/tests/v2/test_waku_store_rpc_codec.nim index e1409a796..982a07e32 100644 --- a/tests/v2/test_waku_store_rpc_codec.nim +++ b/tests/v2/test_waku_store_rpc_codec.nim @@ -13,13 +13,13 @@ import ../../waku/v2/utils/time const - DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto" - DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto") + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") proc fakeWakuMessage( payload = "TEST-PAYLOAD", - contentTopic = DEFAULT_CONTENT_TOPIC, + contentTopic = DefaultContentTopic, ts = getNanosecondTime(epochTime()) ): WakuMessage = WakuMessage( @@ -34,7 +34,7 @@ procSuite "Waku Store - RPC codec": test "Index protobuf codec": ## Given - let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC) + let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) ## When let encodedIndex = index.encode() @@ -68,7 +68,7 @@ procSuite "Waku Store - RPC codec": test "PagingInfo protobuf codec": ## Given let - index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC) + index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD) ## When @@ -103,9 +103,9 @@ procSuite "Waku Store - RPC codec": test "HistoryQuery protobuf codec": ## Given let - index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC) + index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) - query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC), HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) + query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) ## When let pb = query.encode() @@ -139,7 +139,7 @@ procSuite "Waku Store - RPC codec": ## Given let message = fakeWakuMessage() - index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC) + index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 39c7fe9e6..2fb306f99 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -22,14 +22,18 @@ import ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, + ../../waku/v2/utils/pagination, ../../waku/v2/utils/time, ../../waku/v2/node/wakunode2 + +from std/times import epochTime + + when defined(rln): import std/sequtils import ../../waku/v2/protocol/waku_rln_relay/[waku_rln_relay_utils, waku_rln_relay_types] - from times import epochTime const RLNRELAY_PUBSUB_TOPIC = "waku/2/rlnrelay/proto" template sourceDir: string = currentSourcePath.parentDir() @@ -1197,29 +1201,25 @@ procSuite "WakuNode": # populate db with msg1 to be a duplicate - let index1 = computeIndex(msg1) + let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic) let output1 = store.put(index1, msg1, DefaultTopic) check output1.isOk - discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic)) + discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic) # now run the resume proc await node1.resume() # count the total number of retrieved messages from the database - var responseCount = 0 - proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) = - responseCount += 1 - # retrieve all the messages in the db - let res = store.getAll(data) + let res = store.getAllMessages() check: - res.isErr == false + res.isOk() check: # if the duplicates are discarded properly, then the total number of messages after resume should be 2 # check no duplicates is in the messages field node1.wakuStore.messages.len == 2 # check no duplicates is in the db - responseCount == 2 + res.value.len == 2 await node1.stop() await node2.stop() diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index dc0771054..183932251 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -9,7 +9,6 @@ import chronos import ../../../protocol/waku_message, - ../../../protocol/waku_store/rpc, ../../../utils/time, ../../../utils/pagination @@ -31,41 +30,14 @@ type MessageStore* = ref object of RootObj -# TODO: Deprecate the following type -type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].} - - -# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim -type - IndexedWakuMessage* = object - # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message - ## This type is used to encapsulate a WakuMessage and its Index - msg*: WakuMessage - index*: Index - pubsubTopic*: string - - QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} - # MessageStore interface -method getMostRecentMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard +method put*(ms: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard -method getOldestMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard - -method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard - - -# TODO: Deprecate the following methods after after #1026 -method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard -method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard -method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard - - -# TODO: Move to sqlite store -method getAllMessages(db: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard +method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard method getMessagesByHistoryQuery*( - db: MessageStore, + ms: MessageStore, contentTopic = none(seq[ContentTopic]), pubsubTopic = none(string), cursor = none(Index), diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index aff2b2849..c96f827b5 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -1,425 +1,269 @@ +# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. +# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim {.push raises: [Defect].} import - std/[options, tables, times], + std/[options, tables, times, sequtils, algorithm], stew/[byteutils, results], - chronos, chronicles, + chronos, sqlite3_abi import + ./message_store, + ../sqlite, ../../../protocol/waku_message, - ../../../protocol/waku_store, ../../../utils/pagination, ../../../utils/time, - ../sqlite, - ./message_store, - ./waku_store_queue + ./waku_message_store_queries export sqlite logScope: - topics = "wakuMessageStore" + topics = "message_store.sqlite" -const TABLE_TITLE = "Message" -const MaxStoreOverflow = 1.3 # has to be > 1.0 - -# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. -# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim type # WakuMessageStore implements auto deletion as follows: - # The sqlite DB will store up to `storeMaxLoad = storeCapacity` * `MaxStoreOverflow` messages, giving an overflow window of (storeCapacity*MaxStoreOverflow - storeCapacity). - # In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are deleted. The number of messages that get deleted is (overflow window / 2) = deleteWindow, - # bringing the total number of stored messages back to `storeCapacity + (overflow window / 2)`. The rationale for batch deleting is efficiency. - # We keep half of the overflow window in addition to `storeCapacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of `senderTimestamp`. - # `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting `senderTimestamp`. - # However, `receiverTimestamp` can differ from node to node for the same message. - # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we compensate that by keeping half of the overflow window. + # - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages, + # giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`. + # + # - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are + # deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`, + # bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`. + # + # The rationale for batch deleting is efficiency. We keep half of the overflow window in addition + # to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of + # `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting + # `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message. + # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we + # compensate that by keeping half of the overflow window. WakuMessageStore* = ref object of MessageStore - database*: SqliteDatabase + db: SqliteDatabase numMessages: int - storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. - storeMaxLoad: int # = storeCapacity * MaxStoreOverflow - deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs + capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. + totalCapacity: int # = capacity * StoreMaxOverflow + deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs isSqliteOnly: bool retentionTime: chronos.Duration oldestReceiverTimestamp: int64 - insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void] + insertStmt: SqliteStmt[InsertMessageParams, void] -proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] = - var numMessages: int64 - proc handler(s: ptr sqlite3_stmt) = - numMessages = sqlite3_column_int64(s, 0) - let countQuery = "SELECT COUNT(*) FROM " & TABLE_TITLE - let countRes = db.query(countQuery, handler) - if countRes.isErr: - return err("failed to count number of messages in DB") - ok(numMessages) + +proc calculateTotalCapacity(capacity: int, overflow: float): int {.inline.} = + int(float(capacity) * overflow) + +proc calculateOverflowWindow(capacity: int, overflow: float): int {.inline.} = + int(float(capacity) * (overflow - 1)) + +proc calculateDeleteWindow(capacity: int, overflow: float): int {.inline.} = + calculateOverflowWindow(capacity, overflow) div 2 -proc getOldestDbReceiverTimestamp(db: SqliteDatabase): MessageStoreResult[int64] = - var oldestReceiverTimestamp: int64 - proc handler(s: ptr sqlite3_stmt) = - oldestReceiverTimestamp = column_timestamp(s, 0) - let query = "SELECT MIN(receiverTimestamp) FROM " & TABLE_TITLE; - let queryRes = db.query(query, handler) - if queryRes.isErr: - return err("failed to get the oldest receiver timestamp from the DB") - ok(oldestReceiverTimestamp) +### Store implementation -proc deleteOldestTime(db: WakuMessageStore): MessageStoreResult[void] = - # delete if there are messages in the DB that exceed the retention time by 10% and more (batch delete for efficiency) - let retentionTimestamp = getNanosecondTime(getTime().toUnixFloat()) - db.retentionTime.nanoseconds - let thresholdTimestamp = retentionTimestamp - db.retentionTime.nanoseconds div 10 - if thresholdTimestamp <= db.oldestReceiverTimestamp or db.oldestReceiverTimestamp == 0: return ok() +proc deleteMessagesExceedingRetentionTime(s: WakuMessageStore): MessageStoreResult[void] = + ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) + if s.oldestReceiverTimestamp == 0: + return ok() + + let now = getNanosecondTime(getTime().toUnixFloat()) + let retentionTimestamp = now - s.retentionTime.nanoseconds + let thresholdTimestamp = retentionTimestamp - s.retentionTime.nanoseconds div 10 + if thresholdTimestamp <= s.oldestReceiverTimestamp: + return ok() - var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " & - "WHERE receiverTimestamp < " & $retentionTimestamp - - let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard) - if res.isErr: - return err(res.error) - - info "Messages exceeding retention time deleted from DB. ", retentionTime=db.retentionTime - - db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query works") + s.db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) +proc deleteMessagesOverflowingTotalCapacity(s: WakuMessageStore): MessageStoreResult[void] = + ?s.db.deleteOldestMessagesNotWithinLimit(limit=s.capacity + s.deleteWindow) + info "Oldest messages deleted from db due to overflow.", capacity=s.capacity, maxStore=s.totalCapacity, deleteWindow=s.deleteWindow ok() -proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] = - var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " & - "WHERE id NOT IN " & - "(SELECT id FROM " & TABLE_TITLE & " " & - "ORDER BY receiverTimestamp DESC " & - "LIMIT " & $(db.storeCapacity + db.deleteWindow) & ")" - let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard) - if res.isErr: - return err(res.error) - db.numMessages = db.storeCapacity + db.deleteWindow # sqlite3 DELETE does not return the number of deleted rows; Ideally we would subtract the number of actually deleted messages. We could run a separate COUNT. - info "Oldest messages deleted from DB due to overflow.", storeCapacity=db.storeCapacity, maxStore=db.storeMaxLoad, deleteWindow=db.deleteWindow - when defined(debug): - let numMessages = messageCount(db.database).get() # requires another SELECT query, so only run in debug mode - debug "Number of messages left after delete operation.", messagesLeft=numMessages - - ok() - -proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000, isSqliteOnly = false, retentionTime = chronos.days(30).seconds): MessageStoreResult[T] = +proc init*(T: type WakuMessageStore, db: SqliteDatabase, + capacity: int = StoreDefaultCapacity, + isSqliteOnly = false, + retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] = let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration - ## Table Creation - let - createStmt = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( - id BLOB, - receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, - contentTopic BLOB NOT NULL, - pubsubTopic BLOB NOT NULL, - payload BLOB, - version INTEGER NOT NULL, - senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, - CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) - ) WITHOUT ROWID; - """, NoParams, void).expect("this is a valid statement") - - let prepareRes = createStmt.exec(()) - if prepareRes.isErr: - return err("failed to exec") + + ## Database initialization - # We dispose of this prepared statement here, as we never use it again - createStmt.dispose() + # Create table (if not exists) + let resCreate = createTable(db) + if resCreate.isErr(): + return err("an error occurred while creating the table: " & resCreate.error()) - ## Reusable prepared statements - let - insertStmt = db.prepareStmt( - "INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);", - (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), - void - ).expect("this is a valid statement") + # Create index on receiverTimestamp (if not exists) + let resIndex = createIndex(db) + if resIndex.isErr(): + return err("Could not establish index on receiverTimestamp: " & resIndex.error()) ## General initialization - let numMessages = messageCount(db).get() + let + totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow) + deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow) + + let numMessages = getMessageCount(db).get() debug "number of messages in sqlite database", messageNum=numMessages - # add index on receiverTimestamp - let - addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);" - resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard) - if resIndex.isErr: - return err("Could not establish index on receiverTimestamp: " & resIndex.error) + let oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest receiver timestamp should work") - let - storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow) - deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2) + # Reusable prepared statement + let insertStmt = db.prepareInsertMessageStmt() + + let wms = WakuMessageStore( + db: db, + capacity: capacity, + retentionTime: retentionTime, + isSqliteOnly: isSqliteOnly, + totalCapacity: totalCapacity, + deleteWindow: deleteWindow, + insertStmt: insertStmt, + numMessages: int(numMessages), + oldestReceiverTimestamp: oldestReceiverTimestamp + ) - let wms = WakuMessageStore(database: db, - numMessages: int(numMessages), - storeCapacity: storeCapacity, - storeMaxLoad: storeMaxLoad, - deleteWindow: deleteWindow, - isSqliteOnly: isSqliteOnly, - retentionTime: retentionTime, - oldestReceiverTimestamp: db.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works."), - insertStmt: insertStmt) + # If the in-memory store is used and if the loaded db is already over max load, + # delete the oldest messages before returning the WakuMessageStore object + if not isSqliteOnly and wms.numMessages >= wms.totalCapacity: + let res = wms.deleteMessagesOverflowingTotalCapacity() + if res.isErr(): + return err("deleting oldest messages failed: " & res.error()) - # if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object - if not isSqliteOnly and wms.numMessages >= wms.storeMaxLoad: - let res = wms.deleteOldest() - if res.isErr: return err("deleting oldest messages failed: " & res.error()) + # Update oldest timestamp after deleting messages + wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work") + # Update message count after deleting messages + wms.numMessages = wms.capacity + wms.deleteWindow - # if using the sqlite-only store, delete messages exceeding the retention time + # If using the sqlite-only store, delete messages exceeding the retention time if isSqliteOnly: debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp - let res = wms.deleteOldestTime() - if res.isErr: return err("deleting oldest messages (time) failed: " & res.error()) + + let res = wms.deleteMessagesExceedingRetentionTime() + if res.isErr(): + return err("deleting oldest messages (time) failed: " & res.error()) + + # Update oldest timestamp after deleting messages + wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work") + # Update message count after deleting messages + wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work")) + ok(wms) -method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = - ## Adds a message to the storage. - ## - ## **Example:** - ## - ## .. code-block:: - ## let res = db.put(message) - ## if res.isErr: - ## echo "error" - ## +method put*(s: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = + ## Inserts a message into the store - let res = db.insertStmt.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp)) - if res.isErr: - return err("failed") + # Ensure that messages don't "jump" to the front with future timestamps + if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance: + return err("future_sender_timestamp") - db.numMessages += 1 - # if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages - if not db.isSqliteOnly and db.numMessages >= db.storeMaxLoad: - let res = db.deleteOldest() - if res.isErr: return err("deleting oldest failed") + let res = s.insertStmt.exec(( + @(cursor.digest.data), # id + cursor.receiverTime, # receiverTimestamp + toBytes(message.contentTopic), # contentTopic + message.payload, # payload + toBytes(pubsubTopic), # pubsubTopic + int64(message.version), # version + message.timestamp # senderTimestamp + )) + if res.isErr(): + return err("message insert failed: " & res.error()) - if db.isSqliteOnly: + s.numMessages += 1 + + # If the in-memory store is used and if the loaded db is already over max load, delete the oldest messages + if not s.isSqliteOnly and s.numMessages >= s.totalCapacity: + let res = s.deleteMessagesOverflowingTotalCapacity() + if res.isErr(): + return err("deleting oldest failed: " & res.error()) + + # Update oldest timestamp after deleting messages + s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") + # Update message count after deleting messages + s.numMessages = s.capacity + s.deleteWindow + + if s.isSqliteOnly: # TODO: move to a timer job - # For this experimental version of the new store, it is OK to delete here, because it only actually triggers the deletion if there is a batch of messages older than the threshold. + # For this experimental version of the new store, it is OK to delete here, because it only actually + # triggers the deletion if there is a batch of messages older than the threshold. # This only adds a few simple compare operations, if deletion is not necessary. # Still, the put that triggers the deletion might return with a significant delay. - if db.oldestReceiverTimestamp == 0: db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works.") - let res = db.deleteOldestTime() - if res.isErr: return err("deleting oldest failed") + if s.oldestReceiverTimestamp == 0: + s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") + + let res = s.deleteMessagesExceedingRetentionTime() + if res.isErr(): + return err("delete messages exceeding the retention time failed: " & res.error()) + + # Update oldest timestamp after deleting messages + s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") + # Update message count after deleting messages + s.numMessages = int(s.db.getMessageCount().expect("query for oldest timestamp should work")) ok() -method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = - ## Retrieves `storeCapacity` many messages from the storage. - ## - ## **Example:** - ## - ## .. code-block:: - ## proc data(timestamp: uint64, msg: WakuMessage) = - ## echo cast[string](msg.payload) - ## - ## let res = db.get(data) - ## if res.isErr: - ## echo "error" - var gotMessages = false - proc msg(s: ptr sqlite3_stmt) = - gotMessages = true - let - receiverTimestamp = column_timestamp(s, 0) - topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) - topicLength = sqlite3_column_bytes(s,1) - contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))) - - p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) - length = sqlite3_column_bytes(s, 2) - payload = @(toOpenArray(p, 0, length-1)) - - pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) - pubsubTopicLength = sqlite3_column_bytes(s,3) - pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) - - version = sqlite3_column_int64(s, 4) - - senderTimestamp = column_timestamp(s, 5) +method getAllMessages*(s: WakuMessageStore): MessageStoreResult[seq[MessageStoreRow]] = + ## Retrieve all messages from the store. + s.db.selectAllMessages() - # TODO retrieve the version number - onData(Timestamp(receiverTimestamp), - WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: Timestamp(senderTimestamp)), - pubsubTopic) +method getMessagesByHistoryQuery*( + s: WakuMessageStore, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(Index), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = StoreMaxPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] = + let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize + else: min(maxPageSize, StoreMaxPageSize) - var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & - "FROM " & TABLE_TITLE & " " & - "ORDER BY receiverTimestamp ASC" + let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( + contentTopic, + pubsubTopic, + cursor, + startTime, + endTime, + limit=pageSizeLimit, + ascending=ascendingOrder + ) - # Apply limit. This works because SQLITE will perform the time-based ORDER BY before applying the limit. - selectQuery &= " LIMIT " & $db.storeCapacity & - " OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $db.storeCapacity # offset = total_row_count - limit + if rows.len <= 0: + return ok((@[], none(PagingInfo))) + + var messages = rows.mapIt(it[0]) + + # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message + # Compute last message index + let (message, receivedTimestamp, pubsubTopic) = rows[^1] + let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic) + + let pagingInfo = PagingInfo( + pageSize: uint64(messages.len), + cursor: lastIndex, + direction: if ascendingOrder: PagingDirection.FORWARD + else: PagingDirection.BACKWARD + ) + + # The retrieved messages list should always be in chronological order + if not ascendingOrder: + messages.reverse() + + ok((messages, some(pagingInfo))) + + +proc close*(s: WakuMessageStore) = + ## Close the database connection - let res = db.database.query(selectQuery, msg) - if res.isErr: - return err(res.error) + # Dispose statements + s.insertStmt.dispose() - ok gotMessages - -proc adjustDbPageSize(dbPageSize: uint64, matchCount: uint64, returnPageSize: uint64): uint64 {.inline.} = - const maxDbPageSize: uint64 = 20000 # the maximum DB page size is limited to prevent excessive use of memory in case of very sparse or non-matching filters. TODO: dynamic, adjust to available memory - if dbPageSize >= maxDbPageSize: - return maxDbPageSize - var ret = - if matchCount < 2: dbPageSize * returnPageSize - else: dbPageSize * (returnPageSize div matchCount) - ret = min(ret, maxDbPageSize) - trace "dbPageSize adjusted to: ", ret - ret - - -method getPage*(db: WakuMessageStore, - pred: QueryFilterMatcher, - pagingInfo: PagingInfo): - MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] = - ## Get a single page of history matching the predicate and - ## adhering to the pagingInfo parameters - - trace "getting page from SQLite DB", pagingInfo=pagingInfo - - let - responsePageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos - else: pagingInfo.pageSize - - var dbPageSize = responsePageSize # we retrieve larger pages from the DB for queries with (sparse) filters (TODO: improve adaptive dbPageSize increase) - - var cursor = pagingInfo.cursor - - var messages: seq[WakuMessage] - var - lastIndex: Index - numRecordsVisitedPage: uint64 = 0 # number of DB records visited during retrieving the last page from the DB - numRecordsVisitedTotal: uint64 = 0 # number of DB records visited in total - numRecordsMatchingPred: uint64 = 0 # number of records that matched the predicate on the last DB page; we use this as to gauge the sparseness of rows matching the filter. - - proc msg(s: ptr sqlite3_stmt) = # this is the actual onData proc that is passed to the query proc (the message store adds one indirection) - if uint64(messages.len) >= responsePageSize: return - let - receiverTimestamp = column_timestamp(s, 0) - - topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) - topicLength = sqlite3_column_bytes(s,1) - contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))) - - p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) - length = sqlite3_column_bytes(s, 2) - payload = @(toOpenArray(p, 0, length-1)) - - pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) - pubsubTopicLength = sqlite3_column_bytes(s,3) - pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) - - version = sqlite3_column_int64(s, 4) - - senderTimestamp = column_timestamp(s, 5) - retMsg = WakuMessage(contentTopic: contentTopic, payload: payload, version: uint32(version), timestamp: Timestamp(senderTimestamp)) - # TODO: we should consolidate WakuMessage, Index, and IndexedWakuMessage; reason: avoid unnecessary copying and recalculation - index = retMsg.computeIndex(receiverTimestamp, pubsubTopic) # TODO: retrieve digest from DB - indexedWakuMsg = IndexedWakuMessage(msg: retMsg, index: index, pubsubTopic: pubsubTopic) # TODO: constructing indexedWakuMsg requires unnecessary copying - - lastIndex = index - numRecordsVisitedPage += 1 - try: - if pred(indexedWakuMsg): #TODO throws unknown exception - numRecordsMatchingPred += 1 - messages.add(retMsg) - except: - # TODO properly handle this exception - quit 1 - - # TODO: deduplicate / condense the following 4 DB query strings - # If no index has been set in pagingInfo, start with the first message (or the last in case of backwards direction) - if cursor == Index(): ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! - let noCursorQuery = - if pagingInfo.direction == PagingDirection.FORWARD: - "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & - "FROM " & TABLE_TITLE & " " & - "ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " & - "LIMIT " & $dbPageSize & ";" - else: - "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & - "FROM " & TABLE_TITLE & " " & - "ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " & - "LIMIT " & $dbPageSize & ";" - - let res = db.database.query(noCursorQuery, msg) - if res.isErr: - return err("failed to execute SQLite query: noCursorQuery") - numRecordsVisitedTotal = numRecordsVisitedPage - numRecordsVisitedPage = 0 - dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize) - numRecordsMatchingPred = 0 - cursor = lastIndex - - let preparedPageQuery = if pagingInfo.direction == PagingDirection.FORWARD: - db.database.prepareStmt( - "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & - "FROM " & TABLE_TITLE & " " & - "WHERE (senderTimestamp, id, pubsubTopic) > (?, ?, ?) " & - "ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " & - "LIMIT ?;", - (Timestamp, seq[byte], seq[byte], int64), # TODO: uint64 not supported yet - (Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), - ).expect("this is a valid statement") - else: - db.database.prepareStmt( - "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & - "FROM " & TABLE_TITLE & " " & - "WHERE (senderTimestamp, id, pubsubTopic) < (?, ?, ?) " & - "ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " & - "LIMIT ?;", - (Timestamp, seq[byte], seq[byte], int64), - (Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), - ).expect("this is a valid statement") - - - # TODO: DoS attack mitigation against: sending a lot of queries with sparse (or non-matching) filters making the store node run through the whole DB. Even worse with pageSize = 1. - while uint64(messages.len) < responsePageSize: - let res = preparedPageQuery.exec((cursor.senderTime, @(cursor.digest.data), cursor.pubsubTopic.toBytes(), dbPageSize.int64), msg) # TODO support uint64, pages large enough to cause an overflow are not expected... - if res.isErr: - return err("failed to execute SQLite prepared statement: preparedPageQuery") - numRecordsVisitedTotal += numRecordsVisitedPage - if numRecordsVisitedPage == 0: break # we are at the end of the DB (find more efficient/integrated solution to track that event) - numRecordsVisitedPage = 0 - cursor = lastIndex - dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize) - numRecordsMatchingPred = 0 - - let outPagingInfo = PagingInfo(pageSize: messages.len.uint, - cursor: lastIndex, - direction: pagingInfo.direction) - - let historyResponseError = if numRecordsVisitedTotal == 0: HistoryResponseError.INVALID_CURSOR # Index is not in DB (also if queried Index points to last entry) - else: HistoryResponseError.NONE - - preparedPageQuery.dispose() - - return ok((messages, outPagingInfo, historyResponseError)) # TODO: historyResponseError is not a "real error": treat as a real error - - - - -method getPage*(db: WakuMessageStore, - pagingInfo: PagingInfo): - MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] = - ## Get a single page of history without filtering. - ## Adhere to the pagingInfo parameters - - proc predicate(i: IndexedWakuMessage): bool = true # no filtering - - return getPage(db, predicate, pagingInfo) - - - - -proc close*(db: WakuMessageStore) = - ## Closes the database. - db.insertStmt.dispose() - db.database.close() + # Close connection + s.db.close() diff --git a/waku/v2/node/storage/message/waku_message_store_queries.nim b/waku/v2/node/storage/message/waku_message_store_queries.nim new file mode 100644 index 000000000..a97a46588 --- /dev/null +++ b/waku/v2/node/storage/message/waku_message_store_queries.nim @@ -0,0 +1,367 @@ +{.push raises: [Defect].} + +import + std/[options, sequtils], + stew/[results, byteutils], + sqlite3_abi +import + ../sqlite, + ../../../protocol/waku_message, + ../../../utils/pagination, + ../../../utils/time + + +const DbTable = "Message" + +type SqlQueryStr = string + + +### SQLite column helper methods + +proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage {.inline.} = + let + topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol)) + topicLength = sqlite3_column_bytes(s, 1) + contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))) + let + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol)) + length = sqlite3_column_bytes(s, 2) + payload = @(toOpenArray(p, 0, length-1)) + let version = sqlite3_column_int64(s, versionCol) + let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol) + + WakuMessage( + contentTopic: ContentTopic(contentTopic), + payload: payload , + version: uint32(version), + timestamp: Timestamp(senderTimestamp) + ) + +proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp {.inline.} = + let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol) + Timestamp(receiverTimestamp) + +proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string {.inline.} = + let + pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol)) + pubsubTopicLength = sqlite3_column_bytes(s, 3) + pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) + + pubsubTopic + + + +### SQLite queries + +## Create table + +template createTableQuery(table: string): SqlQueryStr = + "CREATE TABLE IF NOT EXISTS " & table & " (" & + " id BLOB," & + " receiverTimestamp INTEGER NOT NULL," & + " contentTopic BLOB NOT NULL," & + " pubsubTopic BLOB NOT NULL," & + " payload BLOB," & + " version INTEGER NOT NULL," & + " senderTimestamp INTEGER NOT NULL," & + " CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" & + ") WITHOUT ROWID;" + +proc createTable*(db: SqliteDatabase): DatabaseResult[void] {.inline.} = + let query = createTableQuery(DbTable) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Create index + +template createIndexQuery(table: string): SqlQueryStr = + "CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);" + +proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} = + let query = createIndexQuery(DbTable) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Insert message +type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) + +template insertMessageQuery(table: string): SqlQueryStr = + "INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" & + " VALUES (?, ?, ?, ?, ?, ?, ?);" + +proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = + let query = insertMessageQuery(DbTable) + db.prepareStmt( query, InsertMessageParams, void).expect("this is a valid statement") + + +## Count table messages + +template countMessagesQuery(table: string): SqlQueryStr = + "SELECT COUNT(*) FROM " & table + +proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} = + var count: int64 + proc queryRowCallback(s: ptr sqlite3_stmt) = + count = sqlite3_column_int64(s, 0) + + let query = countMessagesQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to count number of messages in the database") + + ok(count) + + +## Get oldest receiver timestamp + +template selectOldestMessageTimestampQuery(table: string): SqlQueryStr = + "SELECT MIN(receiverTimestamp) FROM " & table + +proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= + var timestamp: Timestamp + proc queryRowCallback(s: ptr sqlite3_stmt) = + timestamp = queryRowReceiverTimestampCallback(s, 0) + + let query = selectOldestMessageTimestampQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to get the oldest receiver timestamp from the database") + + ok(timestamp) + + +## Delete messages older than timestamp + +template deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = + "DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts + +proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] {.inline.} = + let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Delete oldest messages not within limit + +template deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr = + "DELETE FROM " & table & " WHERE id NOT IN (" & + " SELECT id FROM " & table & + " ORDER BY receiverTimestamp DESC" & + " LIMIT " & $limit & + ");" + +proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] {.inline.} = + # NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit + let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Select all messages + +template selectAllMessagesQuery(table: string): SqlQueryStr = + "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" & + " FROM " & table & + " ORDER BY receiverTimestamp ASC" + +proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] = + ## Retrieve all messages from the store. + var rows: seq[(Timestamp, WakuMessage, string)] + proc queryRowCallback(s: ptr sqlite3_stmt) = + let + receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) + wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + + rows.add((receiverTimestamp, wakuMessage, pubsubTopic)) + + let query = selectAllMessagesQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err(res.error()) + + ok(rows) + + +## Select messages by history query with limit + +proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[string] = + if contentTopic.isNone(): + return none(string) + + let topic = contentTopic.get() + if topic.len <= 0: + return none(string) + + var contentTopicWhere = "(" + contentTopicWhere &= "contentTopic = (?)" + for _ in topic[1..^1]: + contentTopicWhere &= " OR contentTopic = (?)" + contentTopicWhere &= ")" + some(contentTopicWhere) + +proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] = + if cursor.isNone(): + return none(string) + + let comp = if ascending: ">" else: "<" + let whereClause = "(senderTimestamp, id, pubsubTopic) " & comp & " (?, ?, ?)" + some(whereClause) + +proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] = + if pubsubTopic.isNone(): + return none(string) + + some("pubsubTopic = (?)") + +proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestamp]): Option[string] = + if startTime.isNone() and endTime.isNone(): + return none(string) + + var where = "(" + if startTime.isSome(): + where &= "senderTimestamp >= (?)" + if startTime.isSome() and endTime.isSome(): + where &= " AND " + if endTime.isSome(): + where &= "senderTimestamp <= (?)" + where &= ")" + some(where) + +proc whereClause(clauses: varargs[Option[string]]): Option[string] = + if clauses.len <= 0 or @clauses.all(proc(clause: Option[string]): bool= clause.isNone()): + return none(string) + + let whereList = @clauses + .filter(proc(clause: Option[string]): bool= clause.isSome()) + .map(proc(clause: Option[string]): string = clause.get()) + + var where: string = whereList[0] + for clause in whereList[1..^1]: + where &= " AND " & clause + some(where) + +proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: uint64, ascending=true): SqlQueryStr = + let order = if ascending: "ASC" else: "DESC" + + var query: string + + query = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" + query &= " FROM " & table + + if where.isSome(): + query &= " WHERE " & where.get() + + query &= " ORDER BY senderTimestamp " & order & ", id " & order & ", pubsubTopic " & order & ", receiverTimestamp " & order + query &= " LIMIT " & $limit & ";" + + query + +proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): DatabaseResult[SqliteStmt[void, void]] = + var s: RawStmtPtr + checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil) + ok(SqliteStmt[void, void](s)) + +proc execSelectMessagesWithLimitStmt(s: SqliteStmt, + contentTopic: Option[seq[ContentTopic]], + pubsubTopic: Option[string], + cursor: Option[Index], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + onRowCallback: DataProc): DatabaseResult[void] = + let s = RawStmtPtr(s) + + # Bind params + var paramIndex = 1 + if contentTopic.isSome(): + for topic in contentTopic.get(): + let topicBlob = toBytes(topic) + checkErr bindParam(s, paramIndex, topicBlob) + paramIndex += 1 + + if cursor.isSome(): # cursor = senderTimestamp, id, pubsubTopic + let senderTimestamp = cursor.get().senderTime + checkErr bindParam(s, paramIndex, senderTimestamp) + paramIndex += 1 + + let id = @(cursor.get().digest.data) + checkErr bindParam(s, paramIndex, id) + paramIndex += 1 + + let pubsubTopic = toBytes(cursor.get().pubsubTopic) + checkErr bindParam(s, paramIndex, pubsubTopic) + paramIndex += 1 + + if pubsubTopic.isSome(): + let pubsubTopic = toBytes(pubsubTopic.get()) + checkErr bindParam(s, paramIndex, pubsubTopic) + paramIndex += 1 + + if startTime.isSome(): + let time = startTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + if endTime.isSome(): + let time = endTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + + try: + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onRowCallback(s) + of SQLITE_DONE: + return ok() + else: + return err($sqlite3_errstr(v)) + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + +proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, + contentTopic: Option[seq[ContentTopic]], + pubsubTopic: Option[string], + cursor: Option[Index], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + limit: uint64, + ascending: bool): DatabaseResult[seq[(WakuMessage, Timestamp, string)]] = + + + var messages: seq[(WakuMessage, Timestamp, string)] = @[] + proc queryRowCallback(s: ptr sqlite3_stmt) = + let + receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) + message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + + messages.add((message, receiverTimestamp, pubsubTopic)) + + let query = block: + let + contentTopicClause = contentTopicWhereClause(contentTopic) + cursorClause = cursorWhereClause(cursor, ascending) + pubsubClause = pubsubWhereClause(pubsubTopic) + timeRangeClause = timeRangeWhereClause(startTime, endTime) + let where = whereClause(contentTopicClause, cursorClause, pubsubClause, timeRangeClause) + selectMessagesWithLimitQuery(DbTable, where, limit, ascending) + + let dbStmt = ?db.prepareSelectMessagesWithlimitStmt(query) + ?dbStmt.execSelectMessagesWithLimitStmt( + contentTopic, + pubsubTopic, + cursor, + startTime, + endTime, + queryRowCallback + ) + dbStmt.dispose() + + ok(messages) diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 85d814cdb..20d2460c5 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -12,16 +12,19 @@ import ./message_store -# TODO: Remove after resolving nwaku #1026 -export - message_store - - logScope: topics = "message_store.storequeue" +type + IndexedWakuMessage* = object + # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message + ## This type is used to encapsulate a WakuMessage and its Index + msg*: WakuMessage + index*: Index + pubsubTopic*: string + + QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} -type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} type StoreQueueRef* = ref object of MessageStore @@ -298,17 +301,9 @@ proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = return ok(res.value.data) + ## --- Queue API --- -method getMostRecentMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] = - let message = ?store.last() - ok(message.index.receiverTime) - -method getOldestMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] = - let message = ?store.first() - ok(message.index.receiverTime) - - proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] = ## Add a message to the queue ## diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index acc7d63a1..8a63e48b7 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -20,7 +20,7 @@ type Sqlite = ptr sqlite3 NoParams* = tuple - RawStmtPtr = ptr sqlite3_stmt + RawStmtPtr* = ptr sqlite3_stmt SqliteStmt*[Params; Result] = distinct RawStmtPtr AutoDisposed[T: ptr|ref] = object @@ -267,18 +267,19 @@ proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] = ok(version) -proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[bool] = +proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[void] = ## sets the value of the user-version integer at offset 60 in the database header. ## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version ## The user-version is an integer that is available to applications to use however they want. ## SQLite makes no use of the user-version itself - proc handler(s: ptr sqlite3_stmt) = - discard + proc handler(s: ptr sqlite3_stmt) = discard + let query = "PRAGMA user_version=" & $version & ";" let res = database.query(query, handler) - if res.isErr: + if res.isErr(): return err("failed to set user_version") - ok(true) + + ok() proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration_utils.USER_VERSION): DatabaseResult[bool] = diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index d130f58f2..eed9689a5 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -448,7 +448,7 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.ra # NYI - Do we need this? #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) -proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} = +proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} = info "mounting store" if node.wakuSwap.isNil: diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 0a519c5d7..ccb3ed62d 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -5,7 +5,7 @@ import std/[tables, times, sequtils, options, math], - stew/[results, byteutils], + stew/results, chronicles, chronos, bearssl, @@ -35,29 +35,29 @@ declarePublicGauge waku_store_queries, "number of store queries received" logScope: topics = "wakustore" +const + WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" + + DefaultTopic* = "/waku/2/default-waku/proto" -const # Constants required for pagination ------------------------------------------- MaxPageSize* = StoreMaxPageSize # TODO the DefaultPageSize can be changed, it's current value is random DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page - MaxRpcSize* = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead - MaxTimeVariance* = StoreMaxTimeVariance - DefaultTopic* = "/waku/2/default-waku/proto" +const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead -const - WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" - DefaultStoreCapacity* = 50_000 # Default maximum of 50k messages stored # Error types (metric label values) const dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" + peerNotFoundFailure = "peer_not_found_failure" + type WakuStoreResult*[T] = Result[T, string] @@ -85,52 +85,51 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} else: none(seq[ContentTopic]) qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic) else: none(string) + qCursor = if query.pagingInfo.cursor != Index(): some(query.pagingInfo.cursor) + else: none(Index) qStartTime = if query.startTime != Timestamp(0): some(query.startTime) else: none(Timestamp) qEndTime = if query.endTime != Timestamp(0): some(query.endTime) else: none(Timestamp) + qMaxPageSize = query.pagingInfo.pageSize + qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD - trace "Combined query criteria into single predicate", contentTopics=qContentTopics, pubsubTopic=qPubSubTopic, startTime=qStartTime, endTime=qEndTime + let queryRes = block: + if w.isSqliteOnly: + w.store.getMessagesByHistoryQuery( + contentTopic = qContentTopics, + pubsubTopic = qPubSubTopic, + cursor = qCursor, + startTime = qStartTime, + endTime = qEndTime, + maxPageSize = qMaxPageSize, + ascendingOrder = qAscendingOrder + ) + else: + w.messages.getMessagesByHistoryQuery( + contentTopic = qContentTopics, + pubsubTopic = qPubSubTopic, + cursor = qCursor, + startTime = qStartTime, + endTime = qEndTime, + maxPageSize = qMaxPageSize, + ascendingOrder = qAscendingOrder + ) - ## Compose filter predicate for message from query criteria - proc matchesQuery(indMsg: IndexedWakuMessage): bool = - trace "Matching indexed message against predicate", msg=indMsg +# Build response + # TODO: Handle errors + if queryRes.isErr(): + return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR) - if qPubSubTopic.isSome(): - # filter on pubsub topic - if indMsg.pubsubTopic != qPubSubTopic.get(): - trace "Failed to match pubsub topic", criteria=qPubSubTopic.get(), actual=indMsg.pubsubTopic - return false - - if qStartTime.isSome() and qEndTime.isSome(): - # temporal filtering - # select only messages whose sender generated timestamps fall bw the queried start time and end time - - if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get(): - trace "Failed to match temporal filter", criteriaStart=qStartTime.get(), criteriaEnd=qEndTime.get(), actual=indMsg.msg.timestamp - return false - - if qContentTopics.isSome(): - # filter on content - if indMsg.msg.contentTopic notin qContentTopics.get(): - trace "Failed to match content topic", criteria=qContentTopics.get(), actual=indMsg.msg.contentTopic - return false - - return true - - let - # Read a page of history matching the query - (wakuMsgList, updatedPagingInfo, error) = - if w.isSqliteOnly: w.store.getPage(matchesQuery, query.pagingInfo).expect("should return a valid result set") # TODO: error handling - else: w.messages.getPage(matchesQuery, query.pagingInfo) - - # Build response - historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) + let (messages, updatedPagingInfo) = queryRes.get() - trace "Successfully populated a history response", response=historyRes - return historyRes + HistoryResponse( + messages: messages, + pagingInfo: updatedPagingInfo.get(PagingInfo()), + error: HistoryResponseError.NONE + ) -proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = +proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = proc handler(conn: Connection, proto: string) {.async.} = var message = await conn.readLp(MaxRpcSize.int) @@ -174,29 +173,25 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = info "SQLite-only store initialized. Messages are *not* loaded into memory." return - proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) = - # TODO index should not be recalculated - discard ws.messages.add(IndexedWakuMessage( - msg: msg, - index: Index.compute(msg, receiverTime, pubsubTopic), - pubsubTopic: pubsubTopic - )) - + # Load all messages from sqliteStore into queueStore info "attempting to load messages from persistent storage" - let res = ws.store.getAll(onData) - if res.isErr: - warn "failed to load messages from store", err = res.error - waku_store_errors.inc(labelValues = ["store_load_failure"]) - else: - info "successfully loaded from store" - + let res = ws.store.getAllMessages() + if res.isOk(): + for (receiverTime, msg, pubsubTopic) in res.value: + let index = Index.compute(msg, receiverTime, pubsubTopic) + discard ws.messages.put(index, msg, pubsubTopic) + + info "successfully loaded messages from the persistent store" + else: + warn "failed to load messages from the persistent store", err = res.error() + debug "the number of messages in the memory", messageNum=ws.messages.len waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, - capacity = DefaultStoreCapacity, isSqliteOnly = false): T = + capacity = StoreDefaultCapacity, isSqliteOnly = false): T = debug "init" var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) output.init(capacity) @@ -240,6 +235,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = trace "failed to store messages", err = res.error waku_store_errors.inc(labelValues = ["store_failure"]) + +# TODO: Remove after converting the query method into a non-callback method +type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} + proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. @@ -252,7 +251,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn if peerOpt.isNone(): error "no suitable remote peers" - waku_store_errors.inc(labelValues = [dialFailure]) + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) return let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) @@ -449,7 +448,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" - waku_store_errors.inc(labelValues = [dialFailure]) + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) return err("no suitable remote peers") debug "a peer is selected from peer manager" @@ -476,7 +475,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand if peerOpt.isNone(): error "no suitable remote peers" - waku_store_errors.inc(labelValues = [dialFailure]) + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) return let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) @@ -507,10 +506,3 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) - - -# TODO: Remove the following deprecated method -proc computeIndex*(msg: WakuMessage, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = DefaultTopic): Index {.deprecated: "Use Index.compute() instead".}= - Index.compute(msg, receivedTime, pubsubTopic)