diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index b568cdcf8..23cb4b05e 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -1,19 +1,32 @@ import - # Waku common tests + # Waku common test suite ./v2/test_envvar_serialization, ./v2/test_confutils_envvar, ./v2/test_sqlite_migrations +import + # Waku archive test suite + ./v2/waku_archive/test_driver_queue_index, + ./v2/waku_archive/test_driver_queue_pagination, + ./v2/waku_archive/test_driver_queue_query, + ./v2/waku_archive/test_driver_queue, + ./v2/waku_archive/test_driver_sqlite_query, + ./v2/waku_archive/test_driver_sqlite, + ./v2/waku_archive/test_retention_policy, + ./v2/waku_archive/test_waku_archive, + + # TODO: Remove with the implementation + ./v2/test_message_store_queue_index, + ./v2/test_message_store_queue_pagination, + ./v2/test_message_store_queue, + ./v2/test_message_store_sqlite_query, + ./v2/test_message_store_sqlite + import # Waku v2 tests ./v2/test_wakunode, ./v2/test_wakunode_relay, # Waku Store - ./v2/test_message_store_queue_index, - ./v2/test_message_store_queue_pagination, - ./v2/test_message_store_queue, - ./v2/test_message_store_sqlite_query, - ./v2/test_message_store_sqlite, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, ./v2/test_waku_store_client, @@ -53,7 +66,7 @@ import ./v2/test_utils_keyfile when defined(rln): - import + import ./v2/test_waku_rln_relay, ./v2/test_wakunode_rln_relay when defined(onchain_rln): diff --git a/tests/v2/test_sqlite_migrations.nim b/tests/v2/test_sqlite_migrations.nim index 2c0fe268f..1c0969708 100644 --- a/tests/v2/test_sqlite_migrations.nim +++ b/tests/v2/test_sqlite_migrations.nim @@ -5,13 +5,38 @@ import stew/results, testutils/unittests import + ../../waku/common/sqlite/database, ../../waku/common/sqlite/migrations {.all.} + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.new(":memory:").tryGet() + template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] suite "SQLite - migrations": + test "set and get user version": + ## Given + let database = newTestDatabase() + + ## When + let setRes = database.setUserVersion(5) + let getRes = database.getUserVersion() + + ## Then + check: + setRes.isOk() + getRes.isOk() + + let version = getRes.tryGet() + check: + version == 5 + + ## Cleanup + database.close() + test "filter and order migration script file paths": ## Given let paths = @[ @@ -64,7 +89,7 @@ suite "SQLite - migrations": ## When let statements = script.breakIntoStatements() - + ## Then check: statements == @[statement1, statement2] @@ -91,5 +116,5 @@ suite "SQLite - migrations": let statements = script.breakIntoStatements() ## Then - check: + check: statements == @[statement1, statement2] diff --git a/tests/v2/waku_archive/test_driver_queue.nim b/tests/v2/waku_archive/test_driver_queue.nim new file mode 100644 index 000000000..0aedc4b4b --- /dev/null +++ b/tests/v2/waku_archive/test_driver_queue.nim @@ -0,0 +1,186 @@ +{.used.} + +import + std/options, + stew/results, + testutils/unittests +import + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver/queue_driver {.all.}, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver/index, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/utils/time + + +# Helper functions + +proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = + ## Use i to generate an IndexedWakuMessage + var data {.noinit.}: array[32, byte] + for x in data.mitems: x = i.byte + + let + message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i)) + cursor = Index( + receiverTime: Timestamp(i), + senderTime: Timestamp(i), + digest: MessageDigest(data: data), + pubsubTopic: "test-pubsub-topic" + ) + + IndexedWakuMessage(msg: message, index: cursor) + +proc getPrepopulatedTestQueue(unsortedSet: auto, capacity: int): QueueDriver = + let driver = QueueDriver.new(capacity) + + for i in unsortedSet: + let message = genIndexedWakuMessage(i.int8) + discard driver.add(message) + + driver + + +procSuite "Sorted driver queue": + + test "queue capacity - add a message over the limit": + ## Given + let capacity = 5 + let driver = QueueDriver.new(capacity) + + ## When + # Fill up the queue + for i in 1..capacity: + let message = genIndexedWakuMessage(i.int8) + require(driver.add(message).isOk()) + + # Add one more. Capacity should not be exceeded + let message = genIndexedWakuMessage(capacity.int8 + 1) + require(driver.add(message).isOk()) + + ## Then + check: + driver.len == capacity + + test "queue capacity - add message older than oldest in the queue": + ## Given + let capacity = 5 + let driver = QueueDriver.new(capacity) + + ## When + # Fill up the queue + for i in 1..capacity: + let message = genIndexedWakuMessage(i.int8) + require(driver.add(message).isOk()) + + # Attempt to add message with older value than oldest in queue should fail + let + oldestTimestamp = driver.first().get().index.senderTime + message = genIndexedWakuMessage(oldestTimestamp.int8 - 1) + addRes = driver.add(message) + + ## Then + check: + addRes.isErr() + addRes.error() == "too_old" + + check: + driver.len == capacity + + test "queue sort-on-insert": + ## Given + let + capacity = 5 + unsortedSet = [5,1,3,2,4] + let driver = getPrepopulatedTestQueue(unsortedSet, capacity) + + # Walk forward through the set and verify ascending order + var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index + for i in driver.fwdIterator: + let (index, _) = i + check cmp(index, prevSmaller) > 0 + prevSmaller = index + + # Walk backward through the set and verify descending order + var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index + for i in driver.bwdIterator: + let (index, _) = i + check cmp(index, prevLarger) < 0 + prevLarger = index + + test "access first item from queue": + ## Given + let + capacity = 5 + unsortedSet = [5,1,3,2,4] + let driver = getPrepopulatedTestQueue(unsortedSet, capacity) + + ## When + let firstRes = driver.first() + + ## Then + check: + firstRes.isOk() + + let first = firstRes.tryGet() + check: + first.msg.timestamp == Timestamp(1) + + test "get first item from empty queue should fail": + ## Given + let capacity = 5 + let driver = QueueDriver.new(capacity) + + ## When + let firstRes = driver.first() + + ## Then + check: + firstRes.isErr() + firstRes.error() == "Not found" + + test "access last item from queue": + ## Given + let + capacity = 5 + unsortedSet = [5,1,3,2,4] + let driver = getPrepopulatedTestQueue(unsortedSet, capacity) + + ## When + let lastRes = driver.last() + + ## Then + check: + lastRes.isOk() + + let last = lastRes.tryGet() + check: + last.msg.timestamp == Timestamp(5) + + test "get last item from empty queue should fail": + ## Given + let capacity = 5 + let driver = QueueDriver.new(capacity) + + ## When + let lastRes = driver.last() + + ## Then + check: + lastRes.isErr() + lastRes.error() == "Not found" + + test "verify if queue contains an index": + ## Given + let + capacity = 5 + unsortedSet = [5,1,3,2,4] + let driver = getPrepopulatedTestQueue(unsortedSet, capacity) + + let + existingIndex = genIndexedWakuMessage(4).index + nonExistingIndex = genIndexedWakuMessage(99).index + + ## Then + check: + driver.contains(existingIndex) == true + driver.contains(nonExistingIndex) == false diff --git a/tests/v2/waku_archive/test_driver_queue_index.nim b/tests/v2/waku_archive/test_driver_queue_index.nim new file mode 100644 index 000000000..7b5dfce48 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_queue_index.nim @@ -0,0 +1,182 @@ +{.used.} + +import + std/times, + stew/byteutils, + testutils/unittests, + nimcrypto +import + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver/index, + ../../../waku/v2/utils/time, + ../testlib/common + + +## Helpers + +proc getTestTimestamp(offset=0): Timestamp = + let now = getNanosecondTime(epochTime() + float(offset)) + Timestamp(now) + +proc hashFromStr(input: string): MDigest[256] = + var ctx: sha256 + + ctx.init() + ctx.update(input.toBytes()) + let hashed = ctx.finish() + ctx.clear() + + return hashed + + +suite "Queue Driver - index": + + ## Test vars + let + smallIndex1 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) + smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) + largeIndex1 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1 + largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) + eqIndex1 = Index(digest: hashFromStr("0003"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(54321)) + eqIndex2 = Index(digest: hashFromStr("0003"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(54321)) + eqIndex3 = Index(digest: hashFromStr("0003"), + receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons + senderTime: getNanosecondTime(54321)) + diffPsTopic = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000), + pubsubTopic: "zzzz") + noSenderTime1 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(1100), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") + noSenderTime2 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(10000), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") + noSenderTime3 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(1200), + senderTime: getNanosecondTime(0), + pubsubTopic: "aaaa") + noSenderTime4 = Index(digest: hashFromStr("0"), + receiverTime: getNanosecondTime(1200), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") + + test "Index comparison": + # Index comparison with senderTime diff + check: + cmp(smallIndex1, largeIndex1) < 0 + cmp(smallIndex2, largeIndex1) < 0 + + # Index comparison with digest diff + check: + cmp(smallIndex1, smallIndex2) < 0 + cmp(smallIndex1, largeIndex2) < 0 + cmp(smallIndex2, largeIndex2) > 0 + cmp(largeIndex1, largeIndex2) > 0 + + # Index comparison when equal + check: + cmp(eqIndex1, eqIndex2) == 0 + + # pubsubTopic difference + check: + cmp(smallIndex1, diffPsTopic) < 0 + + # receiverTime diff plays no role when senderTime set + check: + cmp(eqIndex1, eqIndex3) == 0 + + # receiverTime diff plays no role when digest/pubsubTopic equal + check: + cmp(noSenderTime1, noSenderTime2) == 0 + + # sort on receiverTime with no senderTimestamp and unequal pubsubTopic + check: + cmp(noSenderTime1, noSenderTime3) < 0 + + # sort on receiverTime with no senderTimestamp and unequal digest + check: + cmp(noSenderTime1, noSenderTime4) < 0 + + # sort on receiverTime if no senderTimestamp on only one side + check: + cmp(smallIndex1, noSenderTime1) < 0 + cmp(noSenderTime1, smallIndex1) > 0 # Test symmetry + cmp(noSenderTime2, eqIndex3) < 0 + cmp(eqIndex3, noSenderTime2) > 0 # Test symmetry + + test "Index equality": + # Exactly equal + check: + eqIndex1 == eqIndex2 + + # Receiver time plays no role, even without sender time + check: + eqIndex1 == eqIndex3 + noSenderTime1 == noSenderTime2 # only receiver time differs, indices are equal + noSenderTime1 != noSenderTime3 # pubsubTopics differ + noSenderTime1 != noSenderTime4 # digests differ + + # Unequal sender time + check: + smallIndex1 != largeIndex1 + + # Unequal digest + check: + smallIndex1 != smallIndex2 + + # Unequal hash and digest + check: + smallIndex1 != eqIndex1 + + # Unequal pubsubTopic + check: + smallIndex1 != diffPsTopic + + test "Index computation should not be empty": + ## Given + let ts = getTestTimestamp() + let wm = WakuMessage(payload: @[byte 1, 2, 3], timestamp: ts) + + ## When + let ts2 = getTestTimestamp() + 10 + let index = Index.compute(wm, ts2, DefaultContentTopic) + + ## Then + check: + index.digest.data.len != 0 + index.digest.data.len == 32 # sha2 output length in bytes + index.receiverTime == ts2 # the receiver timestamp should be a non-zero value + index.senderTime == ts + index.pubsubTopic == DefaultContentTopic + + test "Index digest of two identical messsage should be the same": + ## Given + let topic = ContentTopic("test-content-topic") + let + wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) + wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) + + ## When + let ts = getTestTimestamp() + let + index1 = Index.compute(wm1, ts, DefaultPubsubTopic) + index2 = Index.compute(wm2, ts, DefaultPubsubTopic) + + ## Then + check: + index1.digest == index2.digest diff --git a/tests/v2/waku_archive/test_driver_queue_pagination.nim b/tests/v2/waku_archive/test_driver_queue_pagination.nim new file mode 100644 index 000000000..3a6ffa898 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_queue_pagination.nim @@ -0,0 +1,403 @@ +{.used.} + +import + std/[options, sequtils, algorithm], + testutils/unittests, + libp2p/protobuf/minprotobuf +import + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver/queue_driver {.all.}, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver/index, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/utils/time, + ../testlib/common + + +proc getTestQueueDriver(numMessages: int): QueueDriver = + let testQueueDriver = QueueDriver.new(numMessages) + + var data {.noinit.}: array[32, byte] + for x in data.mitems: x = 1 + + for i in 0.. bool: + let (pubsubTopic, msg, digest, storeTimestamp) = item + msg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic + + ## Cleanup + driver.close().expect("driver to close") diff --git a/tests/v2/waku_archive/test_driver_sqlite_query.nim b/tests/v2/waku_archive/test_driver_sqlite_query.nim new file mode 100644 index 000000000..3de694178 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_sqlite_query.nim @@ -0,0 +1,1219 @@ +{.used.} + +import + std/[options, sequtils], + testutils/unittests, + chronos +import + ../../../waku/common/sqlite, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/sqlite_driver, + ../../../waku/v2/protocol/waku_message, + ../utils, + ../testlib/common + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.new(":memory:").tryGet() + +proc newTestSqliteDriver(): ArchiveDriver = + let db = newTestDatabase() + SqliteDriver.new(db).tryGet() + +proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = + ArchiveCursor( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + storeTime: message.timestamp, + digest: computeDigest(message) + ) + + +suite "SQLite driver - query by content topic": + + test "no content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + maxPageSize=5, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 5 + filteredMessages == messages[0..4] + + ## Cleanup + driver.close().expect("driver to close") + + test "single content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(ts=ts(00)), + fakeWakuMessage(ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[2..3] + + ## Cleanup + driver.close().expect("driver to close") + + test "single content topic - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(ts=ts(00)), + fakeWakuMessage(ts=ts(10)), + + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(50)), + + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[6..7] + + ## Cleanup + driver.close().expect("driver to close") + + test "multiple content topic": + ## Given + const contentTopic1 = "test-content-topic-1" + const contentTopic2 = "test-content-topic-2" + const contentTopic3 = "test-content-topic-3" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(ts=ts(00)), + fakeWakuMessage(ts=ts(10)), + + fakeWakuMessage(@[byte 1], contentTopic=contentTopic1, ts=ts(20)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic2, ts=ts(30)), + + fakeWakuMessage(@[byte 3], contentTopic=contentTopic3, ts=ts(40)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic1, ts=ts(50)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic2, ts=ts(60)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic3, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic1, contentTopic2], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[2..3] + + ## Cleanup + driver.close().expect("driver to close") + + test "single content topic - no results": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=ts(60)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + driver.close().expect("driver to close") + + test "content topic and max page size - not enough messages stored": + ## Given + const pageSize: uint = 50 + + let driver = newTestSqliteDriver() + + for t in 0..<40: + let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[DefaultContentTopic], + maxPageSize=pageSize, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 40 + + ## Cleanup + driver.close().expect("driver to close") + + +suite "SQLite driver - query by pubsub topic": + + test "pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + pubsubTopic=some(pubsubTopic), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[4..5] + + ## Cleanup + driver.close().expect("driver to close") + + test "no pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[0..1] + + ## Cleanup + driver.close().expect("driver to close") + + test "content topic and pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[4..5] + + ## Cleanup + driver.close().expect("driver to close") + + +suite "SQLite driver - query by cursor": + + test "only cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[4]) + + ## When + let res = driver.getMessages( + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[5..6] + + ## Cleanup + driver.close().expect("driver to close") + + test "only cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[4]) + + ## When + let res = driver.getMessages( + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[2..3] + + ## Cleanup + driver.close().expect("driver to close") + + test "content topic and cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[4]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[5..6] + + ## Cleanup + driver.close().expect("driver to close") + + test "content topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let messages = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), # << cursor + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[6]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 4 + filteredMessages == messages[2..5] + + ## Cleanup + driver.close().expect("driver to close") + + test "pubsub topic and cursor": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), # << cursor + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(messages[5][0], messages[5][1]) + + ## When + let res = driver.getMessages( + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[6..7] + + ## Cleanup + driver.close().expect("driver to close") + + test "pubsub topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), # << cursor + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(messages[6][0], messages[6][1]) + + ## When + let res = driver.getMessages( + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=false + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[4..5] + + ## Cleanup + driver.close().expect("driver to close") + + +suite "SQLite driver - query by time range": + + test "start time only": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 5 + filteredMessages == messages[2..6] + + ## Cleanup + driver.close().expect("driver to close") + + test "end time only": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + # end_time + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 5 + filteredMessages == messages[0..4] + + ## Cleanup + driver.close().expect("driver to close") + + test "start time and end time": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + # start_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + # end_time + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + startTime=some(ts(15, timeOrigin)), + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 3 + filteredMessages == expectedMessages[2..4] + + ## Cleanup + driver.close().expect("driver to close") + + test "invalid time range - no results": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + # end_time + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(45, timeOrigin)), + endTime=some(ts(15, timeOrigin)), + maxPageSize=2, + ascendingOrder=true + ) + + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + driver.close().expect("driver to close") + + test "time range start and content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 5 + filteredMessages == messages[2..6] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range start and content topic - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], ts=ts(90, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 5 + filteredMessages == messages[2..6] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range start, single content topic and cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), # << cursor + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[3]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 6 + filteredMessages == messages[4..9] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range start, single content topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), # << cursor + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin)), + ] + + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[6]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == messages[3..4] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range, content topic, pubsub topic and cursor": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + # start_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + # end_time + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, messages[1][1]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(0, timeOrigin)), + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[3..4] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range, content topic, pubsub topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(messages[7][0], messages[7][1]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[4..5] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(messages[1][0], messages[1][1]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + check: + res.isOk() + + let expectedMessages = messages.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 2 + filteredMessages == expectedMessages[4..5] + + ## Cleanup + driver.close().expect("driver to close") + + test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestSqliteDriver() + + let timeOrigin = now() + let messages = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + + for row in messages: + let (topic, msg) = row + require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + + let cursor = computeTestCursor(messages[1][0], messages[1][1]) + + ## When + let res = driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=false, + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + driver.close().expect("driver to close") diff --git a/tests/v2/waku_archive/test_retention_policy.nim b/tests/v2/waku_archive/test_retention_policy.nim new file mode 100644 index 000000000..be3d7554f --- /dev/null +++ b/tests/v2/waku_archive/test_retention_policy.nim @@ -0,0 +1,93 @@ +{.used.} + +import + std/sequtils, + stew/results, + testutils/unittests, + chronos +import + ../../../waku/common/sqlite, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/sqlite_driver, + ../../../waku/v2/protocol/waku_archive/retention_policy, + ../../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/utils/time, + ../utils, + ../testlib/common + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.new(":memory:").tryGet() + +proc newTestArchiveDriver(): ArchiveDriver = + let db = newTestDatabase() + SqliteDriver.new(db).tryGet() + + +suite "Waku Archive - Retention policy": + + test "capacity retention policy - windowed message deletion": + ## Given + let + capacity = 100 + excess = 65 + + let driver = newTestArchiveDriver() + + let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) + + ## When + for i in 1..capacity+excess: + let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require retentionPolicy.execute(driver).isOk() + + ## Then + let numMessages = driver.getMessagesCount().tryGet() + check: + # Expected number of messages is 120 because + # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) + # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store + numMessages == 120 + + ## Cleanup + driver.close().expect("driver to close") + + test "store capacity should be limited": + ## Given + const capacity = 5 + const contentTopic = "test-content-topic" + + let + driver = newTestArchiveDriver() + retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) + + let messages = @[ + fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)), + fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(1)), + + fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)), + fakeWakuMessage(contentTopic=contentTopic, ts=ts(6)) + ] + + ## When + for msg in messages: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require retentionPolicy.execute(driver).isOk() + + ## Then + let storedMsg = driver.getAllMessages().tryGet() + check: + storedMsg.len == capacity + storedMsg.all do (item: auto) -> bool: + let (pubsubTopic, msg, digest, storeTimestamp) = item + msg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic + + ## Cleanup + driver.close().expect("driver to close") diff --git a/tests/v2/waku_archive/test_waku_archive.nim b/tests/v2/waku_archive/test_waku_archive.nim new file mode 100644 index 000000000..3d805c6ce --- /dev/null +++ b/tests/v2/waku_archive/test_waku_archive.nim @@ -0,0 +1,538 @@ +{.used.} + +import + std/[options, sequtils], + testutils/unittests, + chronicles, + libp2p/crypto/crypto +import + ../../../waku/common/sqlite, + ../../../waku/v2/node/peer_manager/peer_manager, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_archive/driver/sqlite_driver, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/utils/time, + ../testlib/common, + ../testlib/switch + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.new(":memory:").tryGet() + +proc newTestArchiveDriver(): ArchiveDriver = + let db = newTestDatabase() + SqliteDriver.new(db).tryGet() + +proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive = + let validator: MessageValidator = DefaultMessageValidator() + WakuArchive.new(driver, validator=some(validator)) + + +suite "Waku Archive - message handling": + + test "it should driver a valid and non-ephemeral message": + ## Setup + let driver = newTestArchiveDriver() + let archive = newTestWakuArchive(driver) + + ## Given + let validSenderTime = now() + let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime) + + ## When + archive.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + driver.getMessagesCount().tryGet() == 1 + + test "it should not driver an ephemeral message": + ## Setup + let driver = newTestArchiveDriver() + let archive = newTestWakuArchive(driver) + + ## Given + let msgList = @[ + fakeWakuMessage(ephemeral = false, payload = "1"), + fakeWakuMessage(ephemeral = true, payload = "2"), + fakeWakuMessage(ephemeral = true, payload = "3"), + fakeWakuMessage(ephemeral = true, payload = "4"), + fakeWakuMessage(ephemeral = false, payload = "5"), + ] + + ## When + for msg in msgList: + archive.handleMessage(DefaultPubsubTopic, msg) + + ## Then + check: + driver.getMessagesCount().tryGet() == 2 + + test "it should driver a message with no sender timestamp": + ## Setup + let driver = newTestArchiveDriver() + let archive = newTestWakuArchive(driver) + + ## Given + let invalidSenderTime = 0 + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + archive.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + driver.getMessagesCount().tryGet() == 1 + + test "it should not driver a message with a sender time variance greater than max time variance (future)": + ## Setup + let driver = newTestArchiveDriver() + let archive = newTestWakuArchive(driver) + + ## Given + let + now = now() + invalidSenderTime = now + MaxMessageTimestampVariance + 1 + + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + archive.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + driver.getMessagesCount().tryGet() == 0 + + test "it should not driver a message with a sender time variance greater than max time variance (past)": + ## Setup + let driver = newTestArchiveDriver() + let archive = newTestWakuArchive(driver) + + ## Given + let + now = now() + invalidSenderTime = now - MaxMessageTimestampVariance - 1 + + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + archive.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + driver.getMessagesCount().tryGet() == 0 + + +procSuite "Waku Archive - find messages": + ## Fixtures + let timeOrigin = now() + let archiveA = block: + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let msgList = @[ + fakeWakuMessage(@[byte 00], contentTopic=ContentTopic("2"), ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], contentTopic=ContentTopic("1"), ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], contentTopic=ContentTopic("2"), ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], contentTopic=ContentTopic("1"), ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], contentTopic=ContentTopic("2"), ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], contentTopic=ContentTopic("1"), ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], contentTopic=ContentTopic("2"), ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], contentTopic=ContentTopic("1"), ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], contentTopic=ContentTopic("2"), ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], contentTopic=ContentTopic("1"), ts=ts(90, timeOrigin)) + ] + + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + archive + + test "handle query": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let topic = ContentTopic("1") + let + msg1 = fakeWakuMessage(contentTopic=topic) + msg2 = fakeWakuMessage() + + archive.handleMessage("foo", msg1) + archive.handleMessage("foo", msg2) + + ## Given + let req = ArchiveQuery(contentTopics: @[topic]) + + ## When + let queryRes = archive.findMessages(req) + + ## Then + check: + queryRes.isOk() + + let response = queryRes.tryGet() + check: + response.messages.len == 1 + response.messages == @[msg1] + + test "handle query with multiple content filters": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let + topic1 = ContentTopic("1") + topic2 = ContentTopic("2") + topic3 = ContentTopic("3") + + let + msg1 = fakeWakuMessage(contentTopic=topic1) + msg2 = fakeWakuMessage(contentTopic=topic2) + msg3 = fakeWakuMessage(contentTopic=topic3) + + archive.handleMessage("foo", msg1) + archive.handleMessage("foo", msg2) + archive.handleMessage("foo", msg3) + + ## Given + let req = ArchiveQuery(contentTopics: @[topic1, topic3]) + + ## When + let queryRes = archive.findMessages(req) + + ## Then + check: + queryRes.isOk() + + let response = queryRes.tryGet() + check: + response.messages.len() == 2 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg3) + + test "handle query with pubsub topic filter": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let + pubsubTopic1 = "queried-topic" + pubsubTopic2 = "non-queried-topic" + + let + contentTopic1 = ContentTopic("1") + contentTopic2 = ContentTopic("2") + contentTopic3 = ContentTopic("3") + + let + msg1 = fakeWakuMessage(contentTopic=contentTopic1) + msg2 = fakeWakuMessage(contentTopic=contentTopic2) + msg3 = fakeWakuMessage(contentTopic=contentTopic3) + + archive.handleMessage(pubsubtopic1, msg1) + archive.handleMessage(pubsubtopic2, msg2) + archive.handleMessage(pubsubtopic2, msg3) + + ## Given + # This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) + let req = ArchiveQuery( + pubsubTopic: some(pubsubTopic1), + contentTopics: @[contentTopic1, contentTopic3] + ) + + ## When + let queryRes = archive.findMessages(req) + + ## Then + check: + queryRes.isOk() + + let response = queryRes.tryGet() + check: + response.messages.len() == 1 + response.messages.anyIt(it == msg1) + + test "handle query with pubsub topic filter - no match": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let + pubsubtopic1 = "queried-topic" + pubsubtopic2 = "non-queried-topic" + + let + msg1 = fakeWakuMessage() + msg2 = fakeWakuMessage() + msg3 = fakeWakuMessage() + + archive.handleMessage(pubsubtopic2, msg1) + archive.handleMessage(pubsubtopic2, msg2) + archive.handleMessage(pubsubtopic2, msg3) + + ## Given + let req = ArchiveQuery(pubsubTopic: some(pubsubTopic1)) + + ## When + let res = archive.findMessages(req) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 0 + + test "handle query with pubsub topic filter - match the entire stored messages": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let pubsubTopic = "queried-topic" + + let + msg1 = fakeWakuMessage(payload="TEST-1") + msg2 = fakeWakuMessage(payload="TEST-2") + msg3 = fakeWakuMessage(payload="TEST-3") + + archive.handleMessage(pubsubTopic, msg1) + archive.handleMessage(pubsubTopic, msg2) + archive.handleMessage(pubsubTopic, msg3) + + ## Given + let req = ArchiveQuery(pubsubTopic: some(pubsubTopic)) + + ## When + let res = archive.findMessages(req) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 3 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg2) + response.messages.anyIt(it == msg3) + + test "handle query with forward pagination": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let currentTime = now() + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8), + fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7), + fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6), + fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5), + fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4), + fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3), + fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2), + fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime) + ] + + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## Given + var req = ArchiveQuery( + contentTopics: @[DefaultContentTopic], + pageSize: 2, + ascending: true + ) + + ## When + var res = archive.findMessages(req) + require res.isOk() + + var + response = res.tryGet() + totalMessages = response.messages.len() + totalQueries = 1 + + while response.cursor.isSome(): + require: + totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever + response.messages.len() == 2 + + req.cursor = response.cursor + + # Continue querying + res = archive.findMessages(req) + require res.isOk() + response = res.tryGet() + totalMessages += response.messages.len() + totalQueries += 1 + + ## Then + check: + totalQueries == 4 # 4 queries of pageSize 2 + totalMessages == 8 # 8 messages in total + + test "handle query with backward pagination": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let currentTime = now() + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=currentTime - 9), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=currentTime - 8), + fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=currentTime - 7), + fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=currentTime - 6), + fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=currentTime - 5), + fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic, ts=currentTime - 4), + fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic, ts=currentTime - 3), + fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic, ts=currentTime - 2), + fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic, ts=currentTime - 1), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=currentTime) + ] + + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## Given + var req = ArchiveQuery( + contentTopics: @[DefaultContentTopic], + pageSize: 2, + ascending: false + ) + + ## When + var res = archive.findMessages(req) + require res.isOk() + + var + response = res.tryGet() + totalMessages = response.messages.len() + totalQueries = 1 + + while response.cursor.isSome(): + require: + totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever + response.messages.len() == 2 + + req.cursor = response.cursor + + # Continue querying + res = archive.findMessages(req) + require res.isOk() + response = res.tryGet() + totalMessages += response.messages.len() + totalQueries += 1 + + ## Then + check: + totalQueries == 4 # 4 queries of pageSize 2 + totalMessages == 8 # 8 messages in total + + test "handle query with no paging info - auto-pagination": + ## Setup + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) + + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2")), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 5], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 6], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 7], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 8], contentTopic=DefaultContentTopic), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2")) + ] + + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + ## Given + let req = ArchiveQuery(contentTopics: @[DefaultContentTopic]) + + ## When + let res = archive.findMessages(req) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + ## No pagination specified. Response will be auto-paginated with + ## up to MaxPageSize messages per page. + response.messages.len() == 8 + response.cursor.isNone() + + test "handle temporal history query with a valid time window": + ## Given + let req = ArchiveQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(ts(15, timeOrigin)), + endTime: some(ts(55, timeOrigin)), + ascending: true + ) + + ## When + let res = archiveA.findMessages(req) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 2 + response.messages.mapIt(it.timestamp) == @[ts(30, timeOrigin), ts(50, timeOrigin)] + + test "handle temporal history query with a zero-size time window": + ## A zero-size window results in an empty list of history messages + ## Given + let req = ArchiveQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(2)), + endTime: some(Timestamp(2)) + ) + + ## When + let res = archiveA.findMessages(req) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 + + test "handle temporal history query with an invalid time window": + ## A history query with an invalid time range results in an empty list of history messages + ## Given + let req = ArchiveQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(5)), + endTime: some(Timestamp(2)) + ) + + ## When + let res = archiveA.findMessages(req) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 diff --git a/waku/v2/protocol/waku_archive.nim b/waku/v2/protocol/waku_archive.nim new file mode 100644 index 000000000..9d716c948 --- /dev/null +++ b/waku/v2/protocol/waku_archive.nim @@ -0,0 +1,11 @@ +import + ./waku_archive/common, + ./waku_archive/archive, + ./waku_archive/driver, + ./waku_archive/retention_policy + +export + common, + archive, + driver, + retention_policy diff --git a/waku/v2/protocol/waku_archive/archive.nim b/waku/v2/protocol/waku_archive/archive.nim new file mode 100644 index 000000000..be44a45de --- /dev/null +++ b/waku/v2/protocol/waku_archive/archive.nim @@ -0,0 +1,204 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +import + std/[tables, times, sequtils, options], + stew/results, + chronicles, + chronos, + metrics +import + ../../utils/time, + ../waku_message, + ./common, + ./archive_metrics, + ./retention_policy, + ./driver + + +logScope: + topics = "waku archive" + +const + DefaultPageSize*: uint = 20 + + MaxPageSize*: uint = 100 + + +## Message validation + +type + MessageValidator* = ref object of RootObj + + ValidationResult* = Result[void, string] + +method validate*(validator: MessageValidator, msg: WakuMessage): ValidationResult {.base.} = discard + + +# Default message validator + +const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" + + +type DefaultMessageValidator* = ref object of MessageValidator + +method validate*(validator: DefaultMessageValidator, msg: WakuMessage): ValidationResult = + if msg.timestamp == 0: + return ok() + + let + now = getNanosecondTime(getTime().toUnixFloat()) + lowerBound = now - MaxMessageTimestampVariance + upperBound = now + MaxMessageTimestampVariance + + if msg.timestamp < lowerBound: + return err(invalidMessageOld) + + if upperBound < msg.timestamp: + return err(invalidMessageFuture) + + ok() + + +## Archive + +type + WakuArchive* = ref object + driver*: ArchiveDriver + validator: MessageValidator + retentionPolicy: RetentionPolicy + +proc new*(T: type WakuArchive, + driver: ArchiveDriver, + validator = none(MessageValidator), + retentionPolicy = none(RetentionPolicy)): T = + WakuArchive( + driver: driver, + validator: validator.get(nil), + retentionPolicy: retentionPolicy.get(nil) + ) + + + +proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) = + if msg.ephemeral: + # Ephemeral message, do not store + return + + if not w.validator.isNil(): + let validationRes = w.validator.validate(msg) + if validationRes.isErr(): + waku_archive_errors.inc(labelValues = [invalidMessage]) + return + + + waku_archive_insert_duration_seconds.time: + let + msgDigest = computeDigest(msg) + msgReceivedTime = if msg.timestamp > 0: msg.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) + + trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest + + let putRes = w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) + if putRes.isErr(): + error "failed to insert message", err=putRes.error + waku_archive_errors.inc(labelValues = [insertFailure]) + + +proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.} = + ## Search the archive to return a single page of messages matching the query criteria + let + qContentTopics = query.contentTopics + qPubSubTopic = query.pubsubTopic + qCursor = query.cursor + qStartTime = query.startTime + qEndTime = query.endTime + qMaxPageSize = if query.pageSize <= 0: DefaultPageSize + else: min(query.pageSize, MaxPageSize) + qAscendingOrder = query.ascending + + + var queryRes: ArchiveDriverResult[seq[ArchiveRow]] + + waku_archive_query_duration_seconds.time: + queryRes = w.driver.getMessages( + contentTopic = qContentTopics, + pubsubTopic = qPubSubTopic, + cursor = qCursor, + startTime = qStartTime, + endTime = qEndTime, + maxPageSize = qMaxPageSize + 1, + ascendingOrder = qAscendingOrder + ) + + # Build response + if queryRes.isErr(): + return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error)) + + let rows = queryRes.get() + + if rows.len <= 0: + return ok(ArchiveResponse( + messages: @[], + cursor: none(ArchiveCursor) + )) + + + # TODO: Move cursor generation to the driver implementation module + var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) + else: rows[0..^2].mapIt(it[1]) + var cursor = none(ArchiveCursor) + + if rows.len > int(qMaxPageSize): + ## Build last message cursor + ## The cursor is built from the last message INCLUDED in the response + ## (i.e. the second last message in the rows list) + let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] + + # TODO: Improve coherence of MessageDigest type + var messageDigest: array[32, byte] + for i in 0.. y + ## + ## Default sorting order priority is: + ## 1. senderTimestamp + ## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal) + ## 3. message digest + ## 4. pubsubTopic + + if x == y: + # Quick exit ensures receiver time does not affect index equality + return 0 + + # Timestamp has a higher priority for comparison + let + # Use receiverTime where senderTime is unset + xTimestamp = if x.senderTime == 0: x.receiverTime + else: x.senderTime + yTimestamp = if y.senderTime == 0: y.receiverTime + else: y.senderTime + + let timecmp = cmp(xTimestamp, yTimestamp) + if timecmp != 0: + return timecmp + + # Continue only when timestamps are equal + let digestcmp = cmp(x.digest.data, y.digest.data) + if digestcmp != 0: + return digestcmp + + return cmp(x.pubsubTopic, y.pubsubTopic) diff --git a/waku/v2/protocol/waku_archive/driver/queue_driver/queue_driver.nim b/waku/v2/protocol/waku_archive/driver/queue_driver/queue_driver.nim new file mode 100644 index 000000000..25a619bc8 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/queue_driver/queue_driver.nim @@ -0,0 +1,307 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, algorithm], + stew/results, + stew/sorted_set, + chronicles +import + ../../../../protocol/waku_message, + ../../../../utils/time, + ../../common, + ../../driver, + ./index + + +logScope: + topics = "waku archive queue_store" + + +const QueueDriverDefaultMaxCapacity* = 25_000 + + +type + IndexedWakuMessage = object + # TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message + ## This type is used to encapsulate a WakuMessage and its Index + msg*: WakuMessage + index*: Index + pubsubTopic*: string + + QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.} + +type + QueueDriverErrorKind {.pure.} = enum + INVALID_CURSOR + + QueueDriverGetPageResult = Result[seq[ArchiveRow], QueueDriverErrorKind] + +proc `$`(error: QueueDriverErrorKind): string = + case error: + of INVALID_CURSOR: + "invalid_cursor" + + +type QueueDriver* = ref object of ArchiveDriver + ## Bounded repository for indexed messages + ## + ## The store queue will keep messages up to its + ## configured capacity. As soon as this capacity + ## is reached and a new message is added, the oldest + ## item will be removed to make space for the new one. + ## This implies both a `delete` and `add` operation + ## for new items. + ## + ## TODO: a circular/ring buffer may be a more efficient implementation + ## TODO: we don't need to store the Index twice (as key and in the value) + items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages + capacity: int # Maximum amount of messages to keep + + +### Helpers + +proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], + startCursor: Index, + forward: bool): SortedSetResult[Index, IndexedWakuMessage] = + ## Walk to util we find the cursor + ## TODO: Improve performance here with a binary/tree search + + var nextItem = if forward: w.first() + else: w.last() + + ## Fast forward until we reach the startCursor + while nextItem.isOk(): + if nextItem.value.key == startCursor: + break + + # Not yet at cursor. Continue advancing + nextItem = if forward: w.next() + else: w.prev() + + return nextItem + + +#### API + +proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T = + var items = SortedSet[Index, IndexedWakuMessage].init() + return QueueDriver(items: items, capacity: capacity) + + +proc contains*(driver: QueueDriver, index: Index): bool = + ## Return `true` if the store queue already contains the `index`, `false` otherwise. + driver.items.eq(index).isOk() + +proc len*(driver: QueueDriver): int {.noSideEffect.} = + driver.items.len + +proc getPage(driver: QueueDriver, + pageSize: uint = 0, + forward: bool = true, + cursor: Option[Index] = none(Index), + predicate: QueryFilterMatcher = nil): QueueDriverGetPageResult = + ## Populate a single page in forward direction + ## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined. + ## Page size must not exceed `maxPageSize` + ## Each entry must match the `pred` + var outSeq: seq[ArchiveRow] + + var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + defer: w.destroy() + + var currentEntry: SortedSetResult[Index, IndexedWakuMessage] + + # Find starting entry + if cursor.isSome(): + let cursorEntry = w.walkToCursor(cursor.get(), forward) + if cursorEntry.isErr(): + return err(QueueDriverErrorKind.INVALID_CURSOR) + + # Advance walker once more + currentEntry = if forward: w.next() + else: w.prev() + else: + # Start from the beginning of the queue + currentEntry = if forward: w.first() + else: w.last() + + trace "Starting page query", currentEntry=currentEntry + + ## This loop walks forward over the queue: + ## 1. from the given cursor (or first/last entry, if not provided) + ## 2. adds entries matching the predicate function to output page + ## 3. until either the end of the queue or maxPageSize is reached + var numberOfItems: uint = 0 + while currentEntry.isOk() and numberOfItems < pageSize: + trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems + + if predicate.isNil() or predicate(currentEntry.value.data): + let + key = currentEntry.value.key + data = currentEntry.value.data + + numberOfItems += 1 + + outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + + currentEntry = if forward: w.next() + else: w.prev() + + trace "Successfully retrieved page", len=outSeq.len + + return ok(outSeq) + + +## --- SortedSet accessors --- + +iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = + ## Forward iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + res = w.first() + + while res.isOk(): + yield (res.value.key, res.value.data) + res = w.next() + + w.destroy() + +iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = + ## Backwards iterator over the entire store queue + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + res = w.last() + + while res.isOk(): + yield (res.value.key, res.value.data) + res = w.prev() + + w.destroy() + +proc first*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + res = w.first() + w.destroy() + + if res.isErr(): + return err("Not found") + + return ok(res.value.data) + +proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] = + var + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + res = w.last() + w.destroy() + + if res.isErr(): + return err("Not found") + + return ok(res.value.data) + + +## --- Queue API --- + +proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] = + ## Add a message to the queue + ## + ## If we're at capacity, we will be removing, the oldest (first) item + if driver.contains(msg.index): + trace "could not add item to store queue. Index already exists", index=msg.index + return err("duplicate") + + # TODO: the below delete block can be removed if we convert to circular buffer + if driver.items.len >= driver.capacity: + var + w = SortedSetWalkRef[Index, IndexedWakuMessage].init(driver.items) + firstItem = w.first + + if cmp(msg.index, firstItem.value.key) < 0: + # When at capacity, we won't add if message index is smaller (older) than our oldest item + w.destroy # Clean up walker + return err("too_old") + + discard driver.items.delete(firstItem.value.key) + w.destroy # better to destroy walker after a delete operation + + driver.items.insert(msg.index).value.data = msg + + return ok() + + +method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] = + let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) + let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) + driver.add(message) + + +method getAllMessages*(driver: QueueDriver): ArchiveDriverResult[seq[ArchiveRow]] = + # TODO: Implement this message_store method + err("interface method not implemented") + +method getMessages*( + driver: QueueDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true +): ArchiveDriverResult[seq[ArchiveRow]] = + let cursor = cursor.map(toIndex) + + let matchesQuery: QueryFilterMatcher = proc(row: IndexedWakuMessage): bool = + if pubsubTopic.isSome() and row.pubsubTopic != pubsubTopic.get(): + return false + + if contentTopic.len > 0 and row.msg.contentTopic notin contentTopic: + return false + + if startTime.isSome() and row.msg.timestamp < startTime.get(): + return false + + if endTime.isSome() and row.msg.timestamp > endTime.get(): + return false + + return true + + var pageRes: QueueDriverGetPageResult + try: + pageRes = driver.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery) + except: + return err(getCurrentExceptionMsg()) + + if pageRes.isErr(): + return err($pageRes.error) + + var rows = pageRes.value + + # All messages MUST be returned in chronological order + if not ascendingOrder: + reverse(rows) + + ok(rows) + + +method getMessagesCount*(driver: QueueDriver): ArchiveDriverResult[int64] = + ok(int64(driver.len())) + +method getOldestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] = + driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + +method getNewestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] = + driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + + +method deleteMessagesOlderThanTimestamp*(driver: QueueDriver, ts: Timestamp): ArchiveDriverResult[void] = + # TODO: Implement this message_store method + err("interface method not implemented") + +method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, limit: int): ArchiveDriverResult[void] = + # TODO: Implement this message_store method + err("interface method not implemented") diff --git a/waku/v2/protocol/waku_archive/driver/sqlite_driver.nim b/waku/v2/protocol/waku_archive/driver/sqlite_driver.nim new file mode 100644 index 000000000..027e00488 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/sqlite_driver.nim @@ -0,0 +1,8 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import ./sqlite_driver/sqlite_driver + +export sqlite_driver diff --git a/waku/v2/protocol/waku_archive/driver/sqlite_driver/cursor.nim b/waku/v2/protocol/waku_archive/driver/sqlite_driver/cursor.nim new file mode 100644 index 000000000..696197c02 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/sqlite_driver/cursor.nim @@ -0,0 +1,14 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +import + ../../../../protocol/waku_message, + ../../../../utils/time, + ../../common + +type DbCursor* = (Timestamp, seq[byte], PubsubTopic) + +proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic) diff --git a/waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations.nim b/waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations.nim new file mode 100644 index 000000000..e0c764883 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations.nim @@ -0,0 +1,38 @@ +{.push raises: [].} + +import + std/[tables, strutils, os], + stew/results, + chronicles +import + ../../../../common/sqlite, + ../../../../common/sqlite/migrations + + +logScope: + topics = "message_store.migration" + + +const SchemaVersion* = 7 # increase this when there is an update in the database schema + +template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".." +const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store" + + +proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = + ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then + ## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path + ## points to the directory holding the migrations scripts once the db is updated, it sets the + ## `user_version` to the `tragetVersion`. + ## + ## If not `targetVersion` is provided, it defaults to `SchemaVersion`. + ## + ## NOTE: Down migration it is not currently supported + debug "starting message store's sqlite database migration" + + let migrationRes = migrations.migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath) + if migrationRes.isErr(): + return err("failed to execute migration scripts: " & migrationRes.error) + + debug "finished message store's sqlite database migration" + ok() diff --git a/waku/v2/protocol/waku_archive/driver/sqlite_driver/queries.nim b/waku/v2/protocol/waku_archive/driver/sqlite_driver/queries.nim new file mode 100644 index 000000000..4b38c2696 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/sqlite_driver/queries.nim @@ -0,0 +1,387 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +import + std/[options, sequtils], + stew/[results, byteutils], + sqlite3_abi +import + ../../../../../common/sqlite, + ../../../../protocol/waku_message, + ../../../../utils/time, + ./cursor + + +const DbTable = "Message" + +type SqlQueryStr = string + + +### SQLite column helper methods + +proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage = + let + topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol)) + topicLength = sqlite3_column_bytes(s, contentTopicCol) + contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))) + let + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol)) + length = sqlite3_column_bytes(s, payloadCol) + payload = @(toOpenArray(p, 0, length-1)) + let version = sqlite3_column_int64(s, versionCol) + let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol) + + WakuMessage( + contentTopic: ContentTopic(contentTopic), + payload: payload , + version: uint32(version), + timestamp: Timestamp(senderTimestamp) + ) + +proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp = + let storedAt = sqlite3_column_int64(s, storedAtCol) + Timestamp(storedAt) + +proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): PubsubTopic = + let + pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol)) + pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol) + pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) + + pubsubTopic + +proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] = + let + digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol)) + digestLength = sqlite3_column_bytes(s, digestCol) + digest = @(toOpenArray(digestPointer, 0, digestLength-1)) + + digest + + +### SQLite queries + +## Create table + +proc createTableQuery(table: string): SqlQueryStr = + "CREATE TABLE IF NOT EXISTS " & table & " (" & + " pubsubTopic BLOB NOT NULL," & + " contentTopic BLOB NOT NULL," & + " payload BLOB," & + " version INTEGER NOT NULL," & + " timestamp INTEGER NOT NULL," & + " id BLOB," & + " storedAt INTEGER NOT NULL," & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + ") WITHOUT ROWID;" + +proc createTable*(db: SqliteDatabase): DatabaseResult[void] = + let query = createTableQuery(DbTable) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Create indices + +proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr = + "CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);" + +proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] = + let query = createOldestMessageTimestampIndexQuery(DbTable) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = + "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);" + +proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = + let query = createHistoryQueryIndexQuery(DbTable) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Insert message +type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) + +proc insertMessageQuery(table: string): SqlQueryStr = + "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + " VALUES (?, ?, ?, ?, ?, ?, ?);" + +proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = + let query = insertMessageQuery(DbTable) + db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement") + + +## Count table messages + +proc countMessagesQuery(table: string): SqlQueryStr = + "SELECT COUNT(*) FROM " & table + +proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = + var count: int64 + proc queryRowCallback(s: ptr sqlite3_stmt) = + count = sqlite3_column_int64(s, 0) + + let query = countMessagesQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to count number of messages in the database") + + ok(count) + + +## Get oldest message receiver timestamp + +proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = + "SELECT MIN(storedAt) FROM " & table + +proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= + var timestamp: Timestamp + proc queryRowCallback(s: ptr sqlite3_stmt) = + timestamp = queryRowReceiverTimestampCallback(s, 0) + + let query = selectOldestMessageTimestampQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to get the oldest receiver timestamp from the database") + + ok(timestamp) + +## Get newest message receiver timestamp + +proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = + "SELECT MAX(storedAt) FROM " & table + +proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= + var timestamp: Timestamp + proc queryRowCallback(s: ptr sqlite3_stmt) = + timestamp = queryRowReceiverTimestampCallback(s, 0) + + let query = selectNewestMessageTimestampQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to get the newest receiver timestamp from the database") + + ok(timestamp) + +## Delete messages older than timestamp + +proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = + "DELETE FROM " & table & " WHERE storedAt < " & $ts + +proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] = + let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Delete oldest messages not within limit + +proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = + "DELETE FROM " & table & " WHERE id NOT IN (" & + " SELECT id FROM " & table & + " ORDER BY storedAt DESC" & + " LIMIT " & $limit & + ");" + +proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] = + # NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit + let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit) + discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) + ok() + + +## Select all messages + +proc selectAllMessagesQuery(table: string): SqlQueryStr = + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & + " FROM " & table & + " ORDER BY storedAt ASC" + +proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = + ## Retrieve all messages from the store. + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + proc queryRowCallback(s: ptr sqlite3_stmt) = + let + pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + digest = queryRowDigestCallback(s, digestCol=6) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) + + rows.add((pubsubTopic, wakuMessage, digest, storedAt)) + + let query = selectAllMessagesQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err(res.error()) + + ok(rows) + + +## Select messages by history query with limit + +proc combineClauses(clauses: varargs[Option[string]]): Option[string] = + let whereSeq = @clauses.filterIt(it.isSome()).mapIt(it.get()) + if whereSeq.len <= 0: + return none(string) + + var where: string = whereSeq[0] + for clause in whereSeq[1..^1]: + where &= " AND " & clause + some(where) + +proc whereClause(cursor: Option[DbCursor], + pubsubTopic: Option[PubsubTopic], + contentTopic: seq[ContentTopic], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + ascending: bool): Option[string] = + + let cursorClause = if cursor.isNone(): + none(string) + else: + let comp = if ascending: ">" else: "<" + some("(storedAt, id) " & comp & " (?, ?)") + + let pubsubTopicClause = if pubsubTopic.isNone(): + none(string) + else: + some("pubsubTopic = (?)") + + let contentTopicClause = if contentTopic.len <= 0: + none(string) + else: + var where = "(" + where &= "contentTopic = (?)" + for _ in contentTopic[1..^1]: + where &= " OR contentTopic = (?)" + where &= ")" + some(where) + + let startTimeClause = if startTime.isNone(): + none(string) + else: + some("storedAt >= (?)") + + let endTimeClause = if endTime.isNone(): + none(string) + else: + some("storedAt <= (?)") + + combineClauses(cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause) + +proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: uint, ascending=true): SqlQueryStr = + let order = if ascending: "ASC" else: "DESC" + + var query: string + + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" + query &= " FROM " & table + + if where.isSome(): + query &= " WHERE " & where.get() + + query &= " ORDER BY storedAt " & order & ", id " & order + query &= " LIMIT " & $limit & ";" + + query + +proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): DatabaseResult[SqliteStmt[void, void]] = + var s: RawStmtPtr + checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil) + ok(SqliteStmt[void, void](s)) + +proc execSelectMessagesWithLimitStmt(s: SqliteStmt, + cursor: Option[DbCursor], + pubsubTopic: Option[PubsubTopic], + contentTopic: seq[ContentTopic], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + onRowCallback: DataProc): DatabaseResult[void] = + let s = RawStmtPtr(s) + + # Bind params + var paramIndex = 1 + + if cursor.isSome(): # cursor = storedAt, id, pubsubTopic + let (storedAt, id, _) = cursor.get() + checkErr bindParam(s, paramIndex, storedAt) + paramIndex += 1 + checkErr bindParam(s, paramIndex, id) + paramIndex += 1 + + if pubsubTopic.isSome(): + let pubsubTopic = toBytes(pubsubTopic.get()) + checkErr bindParam(s, paramIndex, pubsubTopic) + paramIndex += 1 + + for topic in contentTopic: + checkErr bindParam(s, paramIndex, topic.toBytes()) + paramIndex += 1 + + if startTime.isSome(): + let time = startTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + if endTime.isSome(): + let time = endTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + + try: + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onRowCallback(s) + of SQLITE_DONE: + return ok() + else: + return err($sqlite3_errstr(v)) + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + +proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, + contentTopic: seq[ContentTopic], + pubsubTopic: Option[PubsubTopic], + cursor: Option[DbCursor], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + limit: uint, + ascending: bool): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = + + + var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[] + proc queryRowCallback(s: ptr sqlite3_stmt) = + let + pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) + message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) + digest = queryRowDigestCallback(s, digestCol=6) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) + + messages.add((pubsubTopic, message, digest, storedAt)) + + let query = block: + let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending) + selectMessagesWithLimitQuery(DbTable, where, limit, ascending) + + let dbStmt = ?db.prepareSelectMessagesWithlimitStmt(query) + ?dbStmt.execSelectMessagesWithLimitStmt( + cursor, + pubsubTopic, + contentTopic, + startTime, + endTime, + queryRowCallback + ) + dbStmt.dispose() + + ok(messages) diff --git a/waku/v2/protocol/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/v2/protocol/waku_archive/driver/sqlite_driver/sqlite_driver.nim new file mode 100644 index 000000000..f6bb08485 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -0,0 +1,141 @@ +# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. +# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, algorithm], + stew/[byteutils, results], + chronicles +import + ../../../../../common/sqlite, + ../../../../protocol/waku_message, + ../../../../utils/time, + ../../common, + ../../driver, + ./cursor, + ./queries + +logScope: + topics = "waku archive sqlite" + + + +proc init(db: SqliteDatabase): ArchiveDriverResult[void] = + ## Misconfiguration can lead to nil DB + if db.isNil(): + return err("db not initialized") + + # Create table, if doesn't exist + let resCreate = createTable(db) + if resCreate.isErr(): + return err("failed to create table: " & resCreate.error()) + + # Create indices, if don't exist + let resRtIndex = createOldestMessageTimestampIndex(db) + if resRtIndex.isErr(): + return err("failed to create i_rt index: " & resRtIndex.error()) + + let resMsgIndex = createHistoryQueryIndex(db) + if resMsgIndex.isErr(): + return err("failed to create i_msg index: " & resMsgIndex.error()) + + ok() + + +type SqliteDriver* = ref object of ArchiveDriver + db: SqliteDatabase + insertStmt: SqliteStmt[InsertMessageParams, void] + +proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] = + + # Database initialization + let resInit = init(db) + if resInit.isErr(): + return err(resInit.error()) + + # General initialization + let insertStmt = db.prepareInsertMessageStmt() + ok(SqliteDriver(db: db, insertStmt: insertStmt)) + +method close*(s: SqliteDriver): ArchiveDriverResult[void] = + ## Close the database connection + + # Dispose statements + s.insertStmt.dispose() + + # Close connection + s.db.close() + + ok() + + +method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] = + ## Inserts a message into the store + + let res = s.insertStmt.exec(( + @(digest.data), # id + receivedTime, # storedAt + toBytes(message.contentTopic), # contentTopic + message.payload, # payload + toBytes(pubsubTopic), # pubsubTopic + int64(message.version), # version + message.timestamp # senderTimestamp + )) + if res.isErr(): + return err("message insert failed: " & res.error) + + ok() + + +method getAllMessages*(s: SqliteDriver): ArchiveDriverResult[seq[ArchiveRow]] = + ## Retrieve all messages from the store. + s.db.selectAllMessages() + + +method getMessages*( + s: SqliteDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true +): ArchiveDriverResult[seq[ArchiveRow]] = + let cursor = cursor.map(toDbCursor) + + var rows = ?s.db.selectMessagesByHistoryQueryWithLimit( + contentTopic, + pubsubTopic, + cursor, + startTime, + endTime, + limit=maxPageSize, + ascending=ascendingOrder + ) + + # All messages MUST be returned in chronological order + if not ascendingOrder: + reverse(rows) + + ok(rows) + + +method getMessagesCount*(s: SqliteDriver): ArchiveDriverResult[int64] = + s.db.getMessageCount() + +method getOldestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] = + s.db.selectOldestReceiverTimestamp() + +method getNewestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] = + s.db.selectnewestReceiverTimestamp() + + +method deleteMessagesOlderThanTimestamp*(s: SqliteDriver, ts: Timestamp): ArchiveDriverResult[void] = + s.db.deleteMessagesOlderThanTimestamp(ts) + +method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, limit: int): ArchiveDriverResult[void] = + s.db.deleteOldestMessagesNotWithinLimit(limit) diff --git a/waku/v2/protocol/waku_archive/retention_policy.nim b/waku/v2/protocol/waku_archive/retention_policy.nim new file mode 100644 index 000000000..defd89f1d --- /dev/null +++ b/waku/v2/protocol/waku_archive/retention_policy.nim @@ -0,0 +1,16 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + stew/results +import + ./driver + +type RetentionPolicyResult*[T] = Result[T, string] + +type RetentionPolicy* = ref object of RootObj + + +method execute*(p: RetentionPolicy, store: ArchiveDriver): RetentionPolicyResult[void] {.base.} = discard \ No newline at end of file diff --git a/waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity.nim new file mode 100644 index 000000000..e55266f21 --- /dev/null +++ b/waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity.nim @@ -0,0 +1,73 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + stew/results, + chronicles +import + ../driver, + ../retention_policy + +logScope: + topics = "waku archive retention_policy" + + +const DefaultCapacity*: int = 25_000 + +const MaxOverflow = 1.3 + +type + # CapacityRetentionPolicy implements auto deletion as follows: + # - The sqlite DB will driver up to `totalCapacity = capacity` * `MaxOverflow` messages, + # giving an overflowWindow of `capacity * (MaxOverflow - 1) = overflowWindow`. + # + # - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are + # deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`, + # bringing the total number of driverd messages back to `capacity + (overflowWindow / 2)`. + # + # The rationale for batch deleting is efficiency. We keep half of the overflow window in addition + # to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of + # `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting + # `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message. + # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we + # compensate that by keeping half of the overflow window. + CapacityRetentionPolicy* = ref object of RetentionPolicy + capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. + totalCapacity: int # = capacity * MaxOverflow + deleteWindow: int # = capacity * (MaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs + + +proc calculateTotalCapacity(capacity: int, overflow: float): int = + int(float(capacity) * overflow) + +proc calculateOverflowWindow(capacity: int, overflow: float): int = + int(float(capacity) * (overflow - 1)) + +proc calculateDeleteWindow(capacity: int, overflow: float): int = + calculateOverflowWindow(capacity, overflow) div 2 + + +proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T = + let + totalCapacity = calculateTotalCapacity(capacity, MaxOverflow) + deleteWindow = calculateDeleteWindow(capacity, MaxOverflow) + + CapacityRetentionPolicy( + capacity: capacity, + totalCapacity: totalCapacity, + deleteWindow: deleteWindow + ) + +method execute*(p: CapacityRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] = + let numMessages = ?driver.getMessagesCount().mapErr(proc(err: string): string = "failed to get messages count: " & err) + + if numMessages < p.totalCapacity: + return ok() + + let res = driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) + if res.isErr(): + return err("deleting oldest messages failed: " & res.error()) + + ok() diff --git a/waku/v2/protocol/waku_archive/retention_policy/retention_policy_time.nim b/waku/v2/protocol/waku_archive/retention_policy/retention_policy_time.nim new file mode 100644 index 000000000..f9985450c --- /dev/null +++ b/waku/v2/protocol/waku_archive/retention_policy/retention_policy_time.nim @@ -0,0 +1,49 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/times, + stew/results, + chronicles, + chronos +import + ../../../utils/time, + ../driver, + ../retention_policy + +logScope: + topics = "waku archive retention_policy" + + +const DefaultRetentionTime*: int64 = 30.days.seconds + + +type TimeRetentionPolicy* = ref object of RetentionPolicy + retentionTime: chronos.Duration + + +proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T = + TimeRetentionPolicy( + retentionTime: retentionTime.seconds + ) + + +method execute*(p: TimeRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] = + ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) + + let oldestReceiverTimestamp = ?driver.getOldestMessageTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err) + + let now = getNanosecondTime(getTime().toUnixFloat()) + let retentionTimestamp = now - p.retentionTime.nanoseconds + let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10 + + if thresholdTimestamp <= oldestReceiverTimestamp: + return ok() + + let res = driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) + if res.isErr(): + return err("failed to delete oldest messages: " & res.error()) + + ok()