From 4d827694e7b95fbc6e08c7c514b2f83d32b04634 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 9 Jun 2023 12:42:33 +0200 Subject: [PATCH] feat(postgres): complete implementation of driver and apply more tests (#1785) --- tests/all_tests_v2.nim | 1 + .../v2/waku_archive/test_driver_postgres.nim | 2 +- .../test_driver_postgres_query.nim | 1395 +++++++++++++++++ .../driver/postgres_driver/asyncpool.nim | 4 +- .../postgres_driver/postgres_driver.nim | 106 +- 5 files changed, 1475 insertions(+), 33 deletions(-) create mode 100644 tests/v2/waku_archive/test_driver_postgres_query.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index d1a696f25..78d5bcc3e 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -24,6 +24,7 @@ when os == "Linux": # GitHub only supports container actions on Linux # and we need to start a postgress database in a docker container import + ./v2/waku_archive/test_driver_postgres_query, ./v2/waku_archive/test_driver_postgres # Waku store test suite diff --git a/tests/v2/waku_archive/test_driver_postgres.nim b/tests/v2/waku_archive/test_driver_postgres.nim index 1994af6b8..a444ae8e8 100644 --- a/tests/v2/waku_archive/test_driver_postgres.nim +++ b/tests/v2/waku_archive/test_driver_postgres.nim @@ -156,7 +156,7 @@ suite "Postgres driver": contentTopic2], cursor = some( computeTestCursor(pubsubTopic1, - messagesRes.get()[0][1]))) + messagesRes.get()[1][1]))) require messagesRes.isOk() require messagesRes.get().len == 1 diff --git a/tests/v2/waku_archive/test_driver_postgres_query.nim b/tests/v2/waku_archive/test_driver_postgres_query.nim new file mode 100644 index 000000000..84b832818 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_postgres_query.nim @@ -0,0 +1,1395 @@ +{.used.} + +import + std/[options, sequtils, random, algorithm], + testutils/unittests, + chronos, + chronicles +import + ../../../waku/v2/waku_archive, + ../../../waku/v2/waku_archive/driver/postgres_driver, + ../../../waku/v2/waku_core, + ../testlib/common, + ../testlib/wakucore + +logScope: + topics = "test archive postgres driver" + +## This whole file is copied from the 'test_driver_sqlite_query.nim' file +## and it tests the same use cases but using the postgres driver. + +# Initialize the random number generator +common.randomize() + +const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + +proc newTestPostgresDriver(): ArchiveDriver = + let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet() + discard waitFor driver.reset() + + let initRes = waitFor driver.init() + assert initRes.isOk(), initRes.error + + return driver + +proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = + ArchiveCursor( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + storeTime: message.timestamp, + digest: computeDigest(message) + ) + +suite "Postgres driver - query by content topic": + + asyncTest "no content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + maxPageSize=5, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[0..4] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "single content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..3] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "single content topic - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[6..7].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "multiple content topic": + ## Given + const contentTopic1 = "test-content-topic-1" + const contentTopic2 = "test-content-topic-2" + const contentTopic3 = "test-content-topic-3" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic1, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic2, ts=ts(30)), + + fakeWakuMessage(@[byte 4], contentTopic=contentTopic3, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic1, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic2, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic3, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic1, contentTopic2], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..3] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "single content topic - no results": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)), + fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=DefaultContentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=DefaultContentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=DefaultContentTopic, ts=ts(40)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "content topic and max page size - not enough messages stored": + ## Given + const pageSize: uint = 50 + + let driver = newTestPostgresDriver() + + for t in 0..<40: + let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[DefaultContentTopic], + maxPageSize=pageSize, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 40 + + ## Cleanup + (await driver.close()).expect("driver to close") + +suite "Postgres driver - query by pubsub topic": + + asyncTest "pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + pubsubTopic=some(pubsubTopic), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[4..5] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "no pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[0..1] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "content topic and pubsub topic": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50))), + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[4..5] + + ## Cleanup + (await driver.close()).expect("driver to close") + +suite "Postgres driver - query by cursor": + + asyncTest "only cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) + + ## When + let res = await driver.getMessages( + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[5..6] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "only cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) + + ## When + let res = await driver.getMessages( + cursor=some(cursor), + maxPageSize=2, + ascendingOrder=false + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..3].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "content topic and cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), # << cursor + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), + fakeWakuMessage(@[byte 7], ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[5..6] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "content topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00)), + fakeWakuMessage(@[byte 1], ts=ts(10)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60)), # << cursor + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=false + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..5].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "pubsub topic and cursor": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), # << cursor + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(expected[5][0], expected[5][1]) + + ## When + let res = await driver.getMessages( + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[6..7] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "pubsub topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), # << cursor + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(expected[6][0], expected[6][1]) + + ## When + let res = await driver.getMessages( + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + maxPageSize=10, + ascendingOrder=false + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[4..5].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + +suite "Postgres driver - query by time range": + + asyncTest "start time only": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..6] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "end time only": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + # end_time + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[0..4] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "start time and end time": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + # start_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + # end_time + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + startTime=some(ts(15, timeOrigin)), + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[2..4] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "invalid time range - no results": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + # end_time + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(45, timeOrigin)), + endTime=some(ts(15, timeOrigin)), + maxPageSize=2, + ascendingOrder=true + ) + + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range start and content topic": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..6] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range start and content topic - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], ts=ts(90, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2..6].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range start, single content topic and cursor": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), # << cursor + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[4..9] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range start, single content topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + # start_time + fakeWakuMessage(@[byte 2], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), # << cursor + fakeWakuMessage(@[byte 7], contentTopic=contentTopic, ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + cursor=some(cursor), + startTime=some(ts(15, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[3..4].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range, content topic, pubsub topic and cursor": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + # start_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + # end_time + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(0, timeOrigin)), + endTime=some(ts(45, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[3..4] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range, content topic, pubsub topic and cursor - descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(expected[7][0], expected[7][1]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=false + ) + + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[4..5].reversed() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(expected[1][0], expected[1][1]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=true + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.mapIt(it[1]) + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages[4..5] + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": + ## Given + const contentTopic = "test-content-topic" + const pubsubTopic = "test-pubsub-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin))), # << cursor + (DefaultPubsubTopic, fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin))), + # start_time + (pubsubTopic, fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin))), + (pubsubTopic, fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin))), + (DefaultPubsubTopic, fakeWakuMessage(@[byte 8], contentTopic=contentTopic, ts=ts(80, timeOrigin))), + # end_time + (DefaultPubsubTopic, fakeWakuMessage(@[byte 9], contentTopic=contentTopic, ts=ts(90, timeOrigin))), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it[1].payload) + + for row in messages: + let (topic, msg) = row + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + + let cursor = computeTestCursor(expected[1][0], expected[1][1]) + + ## When + let res = await driver.getMessages( + contentTopic= @[contentTopic], + pubsubTopic=some(pubsubTopic), + cursor=some(cursor), + startTime=some(ts(35, timeOrigin)), + endTime=some(ts(85, timeOrigin)), + maxPageSize=10, + ascendingOrder=false, + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages.len == 0 + + ## Cleanup + (await driver.close()).expect("driver to close") + +suite "Postgres driver - retention policy": + + asyncTest "Get oldest and newest message timestamp": + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let oldestTime = ts(00, timeOrigin) + let newestTime = ts(100, timeOrigin) + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=oldestTime), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=newestTime), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + var res = await driver.getOldestMessageTimestamp() + assert res.isOk(), res.error + assert res.get() == oldestTime, "Failed to retrieve the latest timestamp" + + res = await driver.getNewestMessageTimestamp() + assert res.isOk(), res.error + assert res.get() == newestTime, "Failed to retrieve the newest timestamp" + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "Delete messages older than certain timestamp": + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let targetTime = ts(40, timeOrigin) + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=targetTime), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + var res = await driver.getMessagesCount() + assert res.isOk(), res.error + assert res.get() == 7, "Failed to retrieve the initial number of messages" + + let deleteRes = await driver.deleteMessagesOlderThanTimestamp(targetTime) + assert deleteRes.isOk(), deleteRes.error + + res = await driver.getMessagesCount() + assert res.isOk(), res.error + assert res.get() == 3, "Failed to retrieve the # of messages after deletion" + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "Keep last n messages": + const contentTopic = "test-content-topic" + + let driver = newTestPostgresDriver() + + let timeOrigin = now() + let expected = @[ + fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic=contentTopic, ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic=contentTopic, ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], contentTopic=contentTopic, ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], contentTopic=contentTopic, ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], contentTopic=contentTopic, ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], contentTopic=contentTopic, ts=ts(60, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + for msg in messages: + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + + var res = await driver.getMessagesCount() + assert res.isOk(), res.error + assert res.get() == 7, "Failed to retrieve the initial number of messages" + + let deleteRes = await driver.deleteOldestMessagesNotWithinLimit(2) + assert deleteRes.isOk(), deleteRes.error + + res = await driver.getMessagesCount() + assert res.isOk(), res.error + assert res.get() == 2, "Failed to retrieve the # of messages after deletion" + + ## Cleanup + (await driver.close()).expect("driver to close") diff --git a/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim b/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim index 2d7866f31..e26655d04 100644 --- a/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim +++ b/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim @@ -131,6 +131,7 @@ proc query*(pool: PgAsyncPool, args: seq[string] = newSeq[string](0)): Future[Result[seq[Row], string]] {.async.} = ## Runs the SQL query getting results. + ## Retrieves info from the database. let connIndexRes = await pool.getConnIndex() if connIndexRes.isErr(): @@ -147,9 +148,10 @@ proc query*(pool: PgAsyncPool, proc exec*(pool: PgAsyncPool, query: string, - args: seq[string]): + args: seq[string] = newSeq[string](0)): Future[ArchiveDriverResult[void]] {.async.} = ## Runs the SQL query without results. + ## Alters the database state. let connIndexRes = await pool.getConnIndex() if connIndexRes.isErr(): diff --git a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim index 47786faf6..20124690e 100644 --- a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -4,10 +4,7 @@ else: {.push raises: [].} import - std/strformat, - std/nre, - std/options, - std/strutils, + std/[strformat,nre,options,strutils], stew/[results,byteutils], db_postgres, chronos @@ -71,7 +68,7 @@ proc new*(T: type PostgresDriver, proc createMessageTable(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.connPool.exec(createTableQuery(), newSeq[string](0)) + let execRes = await s.connPool.exec(createTableQuery()) if execRes.isErr(): return err("error in createMessageTable: " & execRes.error) @@ -80,8 +77,11 @@ proc createMessageTable(s: PostgresDriver): proc deleteMessageTable*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - let ret = await s.connPool.exec(dropTableQuery(), newSeq[string](0)) - return ret + let execRes = await s.connPool.exec(dropTableQuery()) + if execRes.isErr(): + return err("error in deleteMessageTable: " & execRes.error) + + return ok() proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = @@ -196,7 +196,7 @@ method getMessages*(s: PostgresDriver, let comp = if ascendingOrder: ">" else: "<" statements.add("(storedAt, id) " & comp & " (?,?)") args.add($cursor.get().storeTime) - args.add($cursor.get().digest.data) + args.add(toHex(cursor.get().digest.data)) if startTime.isSome(): statements.add("storedAt >= ?") @@ -234,41 +234,85 @@ method getMessages*(s: PostgresDriver, return ok(results) +proc getInt(s: PostgresDriver, + query: string): + Future[ArchiveDriverResult[int64]] {.async.} = + # Performs a query that is expected to return a single numeric value (int64) + + let rowsRes = await s.connPool.query(query) + if rowsRes.isErr(): + return err("failed in getRow: " & rowsRes.error) + + let rows = rowsRes.get() + if rows.len != 1: + return err("failed in getRow. Expected one row but got " & $rows.len) + + let fields = rows[0] + if fields.len != 1: + return err("failed in getRow: Expected one field but got " & $fields.len) + + var retInt: int64 + try: + retInt = parseInt(fields[0]) + except ValueError: + return err("exception in getRow, parseInt: " & getCurrentExceptionMsg()) + + return ok(retInt) + method getMessagesCount*(s: PostgresDriver): Future[ArchiveDriverResult[int64]] {.async.} = - let rowsRes = await s.connPool.query("SELECT COUNT(1) FROM messages") - if rowsRes.isErr(): - return err("failed to get messages count: " & rowsRes.error) + let intRes = await s.getInt("SELECT COUNT(1) FROM messages") + if intRes.isErr(): + return err("error in getMessagesCount: " & intRes.error) - let rows = rowsRes.get() - if rows.len == 0: - return err("failed to get messages count: rows.len == 0") - - let rowFields = rows[0] - if rowFields.len == 0: - return err("failed to get messages count: rowFields.len == 0") - - let count = parseInt(rowFields[0]) - return ok(count) + return ok(intRes.get()) method getOldestMessageTimestamp*(s: PostgresDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = - return err("not implemented") + + let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages") + if intRes.isErr(): + return err("error in getOldestMessageTimestamp: " & intRes.error) + + return ok(Timestamp(intRes.get())) method getNewestMessageTimestamp*(s: PostgresDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = - return err("not implemented") -method deleteMessagesOlderThanTimestamp*(s: PostgresDriver, - ts: Timestamp): - Future[ArchiveDriverResult[void]] {.async.} = - return err("not implemented") + let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages") + if intRes.isErr(): + return err("error in getOldestMessageTimestamp: " & intRes.error) -method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver, - limit: int): - Future[ArchiveDriverResult[void]] {.async.} = - return err("not implemented") + return ok(Timestamp(intRes.get())) + +method deleteMessagesOlderThanTimestamp*( + s: PostgresDriver, + ts: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = + + let execRes = await s.connPool.exec( + "DELETE FROM messages WHERE storedAt < " & $ts) + if execRes.isErr(): + return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error) + + return ok() + +method deleteOldestMessagesNotWithinLimit*( + s: PostgresDriver, + limit: int): + Future[ArchiveDriverResult[void]] {.async.} = + + let execRes = await s.connPool.exec( + """DELETE FROM messages WHERE id NOT IN + ( + SELECT id FROM messages ORDER BY storedAt DESC LIMIT ? + );""", + @[$limit]) + if execRes.isErr(): + return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error) + + return ok() method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =