diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index b61aa5897..092574f22 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -493,7 +493,7 @@ proc setupProtocols(node: WakuNode, if conf.store: # Archive setup - let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, + let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, conf.storeMaxNumDbConnections, diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index d6d3750da..8f41c26ed 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -70,11 +70,11 @@ proc configureStore(node: WakuNode, discard # Archive setup - let archiveDriverRes = ArchiveDriver.new(storeDbUrl, - storeVacuum, - storeDbMigration, - storeMaxNumDbConnections, - onFatalErrorAction) + let archiveDriverRes = await ArchiveDriver.new(storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections, + onFatalErrorAction) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) diff --git a/migrations/message_store_postgres/content_script_version_1.nim b/migrations/message_store_postgres/content_script_version_1.nim new file mode 100644 index 000000000..37c6bf2ec --- /dev/null +++ b/migrations/message_store_postgres/content_script_version_1.nim @@ -0,0 +1,20 @@ +const ContentScriptVersion_1* = """ +CREATE TABLE IF NOT EXISTS messages ( + pubsubTopic VARCHAR NOT NULL, + contentTopic VARCHAR NOT NULL, + payload VARCHAR, + version INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + id VARCHAR NOT NULL, + messageHash VARCHAR NOT NULL, + storedAt BIGINT NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (messageHash) +); + +CREATE TABLE iF NOT EXISTS version ( + version INTEGER NOT NULL +); + +INSERT INTO version (version) VALUES(1); + +""" diff --git a/migrations/message_store_postgres/content_script_version_2.nim b/migrations/message_store_postgres/content_script_version_2.nim new file mode 100644 index 000000000..674e9993d --- /dev/null +++ b/migrations/message_store_postgres/content_script_version_2.nim @@ -0,0 +1,68 @@ +const ContentScriptVersion_2* = """ +ALTER TABLE messages RENAME TO messages_backup; +ALTER TABLE messages_backup DROP CONSTRAINT messageIndex; + +CREATE TABLE IF NOT EXISTS messages ( + pubsubTopic VARCHAR NOT NULL, + contentTopic VARCHAR NOT NULL, + payload VARCHAR, + version INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + id VARCHAR NOT NULL, + messageHash VARCHAR NOT NULL, + storedAt BIGINT NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt) + ) PARTITION BY RANGE (storedAt); + +DO $$ +DECLARE + min_storedAt numeric; + max_storedAt numeric; + min_storedAtSeconds integer = 0; + max_storedAtSeconds integer = 0; + partition_name TEXT; + create_partition_stmt TEXT; +BEGIN + SELECT MIN(storedAt) into min_storedAt + FROM messages_backup; + + SELECT MAX(storedAt) into max_storedAt + FROM messages_backup; + + min_storedAtSeconds := min_storedAt / 1000000000; + max_storedAtSeconds := max_storedAt / 1000000000; + + partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds; + create_partition_stmt := 'CREATE TABLE ' || partition_name || + ' PARTITION OF messages FOR VALUES FROM (' || + min_storedAt || ') TO (' || (max_storedAt + 1) || ')'; + IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN + EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt; + END IF; +END $$; + +INSERT INTO messages ( + pubsubTopic, + contentTopic, + payload, + version, + timestamp, + id, + messageHash, + storedAt + ) + SELECT pubsubTopic, + contentTopic, + payload, + version, + timestamp, + id, + messageHash, + storedAt + FROM messages_backup; + +DROP TABLE messages_backup; + +UPDATE version SET version = 2 WHERE version = 1; + +""" diff --git a/migrations/message_store_postgres/pg_migration_manager.nim b/migrations/message_store_postgres/pg_migration_manager.nim new file mode 100644 index 000000000..a43a289b5 --- /dev/null +++ b/migrations/message_store_postgres/pg_migration_manager.nim @@ -0,0 +1,37 @@ + +import + content_script_version_1, + content_script_version_2 + +type + MigrationScript* = object + version*: int + scriptContent*: string + +proc init*(T: type MigrationScript, + targetVersion: int, + scriptContent: string): T = + + return MigrationScript( + targetVersion: targetVersion, + scriptContent: scriptContent) + +const PgMigrationScripts* = @[ + MigrationScript( + version: 1, + scriptContent: ContentScriptVersion_1), + MigrationScript( + version: 2, + scriptContent: ContentScriptVersion_2) +] + +proc getMigrationScripts*(currentVersion: int64, + targetVersion: int64): seq[string] = + var ret = newSeq[string]() + var v = currentVersion + while v < targetVersion: + ret.add(PgMigrationScripts[v].scriptContent) + v.inc() + return ret + + diff --git a/tests/testlib/postgres.nim b/tests/testlib/postgres.nim new file mode 100644 index 000000000..adaf8aead --- /dev/null +++ b/tests/testlib/postgres.nim @@ -0,0 +1,33 @@ + +import + chronicles, + chronos +import + ../../../waku/waku_archive, + ../../../waku/waku_archive/driver as driver_module, + ../../../waku/waku_archive/driver/builder, + ../../../waku/waku_archive/driver/postgres_driver + +const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + +proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.async.} = + + proc onErr(errMsg: string) {.gcsafe, closure.} = + error "error creating ArchiveDriver", error = errMsg + quit(QuitFailure) + + let + vacuum = false + migrate = true + maxNumConn = 50 + + let driverRes = await ArchiveDriver.new(storeMessageDbUrl, + vacuum, + migrate, + maxNumConn, + onErr) + if driverRes.isErr(): + onErr("could not create archive driver: " & driverRes.error) + + return ok(driverRes.get()) + diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 90dbf7eeb..0f0d26605 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -9,7 +9,9 @@ import ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, - ../testlib/wakucore + ../testlib/wakucore, + ../testlib/testasync, + ../testlib/postgres proc now():int64 = getTime().toUnix() @@ -24,18 +26,24 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, ) suite "Postgres driver": + ## Unique driver instance + var driver {.threadvar.}: PostgresDriver - const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + asyncSetup: + let driverRes = await newTestPostgresDriver() + if driverRes.isErr(): + assert false, driverRes.error + + driver = PostgresDriver(driverRes.get()) + + asyncTeardown: + let resetRes = await driver.reset() + if resetRes.isErr(): + assert false, resetRes.error + + (await driver.close()).expect("driver to close") asyncTest "Asynchronous queries": - let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl, - maxConnections = 100) - - assert driverRes.isOk(), driverRes.error - - let driver = driverRes.value - discard await driver.reset() - var futures = newSeq[Future[ArchiveDriverResult[void]]](0) let beforeSleep = now() @@ -50,33 +58,9 @@ suite "Postgres driver": # connections and we spawn 100 tasks that spend ~1s each. require diff < 20 - (await driver.close()).expect("driver to close") - - asyncTest "Init database": - let driverRes = PostgresDriver.new(storeMessageDbUrl) - assert driverRes.isOk(), driverRes.error - - let driver = driverRes.value - discard await driver.reset() - - let initRes = await driver.init() - assert initRes.isOk(), initRes.error - - (await driver.close()).expect("driver to close") - asyncTest "Insert a message": const contentTopic = "test-content-topic" - let driverRes = PostgresDriver.new(storeMessageDbUrl) - assert driverRes.isOk(), driverRes.error - - let driver = driverRes.get() - - discard await driver.reset() - - let initRes = await driver.init() - assert initRes.isOk(), initRes.error - let msg = fakeWakuMessage(contentTopic=contentTopic) let computedDigest = computeDigest(msg) @@ -94,24 +78,12 @@ suite "Postgres driver": toHex(computedDigest.data) == toHex(digest) and toHex(actualMsg.payload) == toHex(msg.payload) - (await driver.close()).expect("driver to close") - asyncTest "Insert and query message": const contentTopic1 = "test-content-topic-1" const contentTopic2 = "test-content-topic-2" const pubsubTopic1 = "pubsubtopic-1" const pubsubTopic2 = "pubsubtopic-2" - let driverRes = PostgresDriver.new(storeMessageDbUrl) - assert driverRes.isOk(), driverRes.error - - let driver = driverRes.value - - discard await driver.reset() - - let initRes = await driver.init() - assert initRes.isOk(), initRes.error - let msg1 = fakeWakuMessage(contentTopic=contentTopic1) var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp) @@ -178,19 +150,8 @@ suite "Postgres driver": assert messagesRes.isOk(), messagesRes.error require messagesRes.get().len == 1 - (await driver.close()).expect("driver to close") - asyncTest "Insert true duplicated messages": # Validates that two completely equal messages can not be stored. - let driverRes = PostgresDriver.new(storeMessageDbUrl) - assert driverRes.isOk(), driverRes.error - - let driver = driverRes.value - - discard await driver.reset() - - let initRes = await driver.init() - assert initRes.isOk(), initRes.error let now = now() @@ -205,4 +166,3 @@ suite "Postgres driver": msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp) require not putRes.isOk() - (await driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index 2de13f079..649cc657e 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -7,11 +7,15 @@ import chronicles import ../../../waku/waku_archive, + ../../../waku/waku_archive/driver as driver_module, + ../../../waku/waku_archive/driver/builder, ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../testlib/common, - ../testlib/wakucore + ../testlib/wakucore, + ../testlib/testasync, + ../testlib/postgres logScope: @@ -24,17 +28,6 @@ logScope: # Initialize the random number generator common.randomize() -const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" - -proc newTestPostgresDriver(): ArchiveDriver = - let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet() - discard waitFor driver.reset() - - let initRes = waitFor driver.init() - assert initRes.isOk(), initRes.error - - return driver - proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = ArchiveCursor( pubsubTopic: pubsubTopic, @@ -43,14 +36,28 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC digest: computeDigest(message) ) -suite "Postgres driver - query by content topic": +suite "Postgres driver - queries": + ## Unique driver instance + var driver {.threadvar.}: PostgresDriver + + asyncSetup: + let driverRes = await newTestPostgresDriver() + if driverRes.isErr(): + assert false, driverRes.error + + driver = PostgresDriver(driverRes.get()) + + asyncTeardown: + let resetRes = await driver.reset() + if resetRes.isErr(): + assert false, resetRes.error + + (await driver.close()).expect("driver to close") asyncTest "no content topic": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)), fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)), @@ -83,15 +90,10 @@ suite "Postgres driver - query by content topic": check: filteredMessages == expected[0..4] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "single content topic": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -126,15 +128,10 @@ suite "Postgres driver - query by content topic": check: filteredMessages == expected[2..3] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "single content topic - descending order": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -169,17 +166,12 @@ suite "Postgres driver - query by content topic": check: filteredMessages == expected[6..7].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "multiple content topic": ## Given const contentTopic1 = "test-content-topic-1" const contentTopic2 = "test-content-topic-2" const contentTopic3 = "test-content-topic-3" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -232,15 +224,10 @@ suite "Postgres driver - query by content topic": filteredMessages = res.tryGet().mapIt(it[1]) check filteredMessages == @[expected[2]] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "single content topic - no results": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)), fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)), @@ -270,15 +257,10 @@ suite "Postgres driver - query by content topic": check: filteredMessages.len == 0 - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "content topic and max page size - not enough messages stored": ## Given const pageSize: uint = 50 - let driver = newTestPostgresDriver() - for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() @@ -297,18 +279,11 @@ suite "Postgres driver - query by content topic": check: filteredMessages.len == 40 - ## Cleanup - (await driver.close()).expect("driver to close") - -suite "Postgres driver - query by pubsub topic": - asyncTest "pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), @@ -344,16 +319,11 @@ suite "Postgres driver - query by pubsub topic": check: filteredMessages == expectedMessages[4..5] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "no pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), @@ -388,16 +358,11 @@ suite "Postgres driver - query by pubsub topic": check: filteredMessages == expectedMessages[0..1] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "content topic and pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))), (DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))), @@ -435,17 +400,10 @@ suite "Postgres driver - query by pubsub topic": check: filteredMessages == expectedMessages[4..5] - ## Cleanup - (await driver.close()).expect("driver to close") - -suite "Postgres driver - query by cursor": - asyncTest "only cursor": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -482,15 +440,10 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expected[5..6] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "only cursor - descending order": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -527,15 +480,10 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expected[2..3].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "content topic and cursor": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -571,15 +519,10 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expected[5..6] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "content topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let expected = @[ fakeWakuMessage(@[byte 0], ts=ts(00)), fakeWakuMessage(@[byte 1], ts=ts(10)), @@ -615,16 +558,11 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expected[2..5].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "pubsub topic and cursor": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -667,16 +605,11 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expectedMessages[6..7] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "pubsub topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -719,17 +652,10 @@ suite "Postgres driver - query by cursor": check: filteredMessages == expectedMessages[4..5].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - -suite "Postgres driver - query by time range": - asyncTest "start time only": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -763,15 +689,10 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[2..6] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "end time only": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -805,16 +726,11 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[0..4] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "start time and end time": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -855,15 +771,10 @@ suite "Postgres driver - query by time range": check: filteredMessages == expectedMessages[2..4] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "invalid time range - no results": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -899,15 +810,10 @@ suite "Postgres driver - query by time range": check: filteredMessages.len == 0 - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range start and content topic": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -941,15 +847,10 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[2..6] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range start and content topic - descending order": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -986,15 +887,10 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[2..6].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range start, single content topic and cursor": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -1034,15 +930,10 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[4..9] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range start, single content topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -1082,16 +973,11 @@ suite "Postgres driver - query by time range": check: filteredMessages == expected[3..4].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range, content topic, pubsub topic and cursor": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ # start_time @@ -1136,16 +1022,11 @@ suite "Postgres driver - query by time range": check: filteredMessages == expectedMessages[3..4] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range, content topic, pubsub topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -1190,16 +1071,11 @@ suite "Postgres driver - query by time range": check: filteredMessages == expectedMessages[4..5].reversed() - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -1245,16 +1121,11 @@ suite "Postgres driver - query by time range": check: filteredMessages == expectedMessages[4..5] - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ (DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))), @@ -1299,16 +1170,9 @@ suite "Postgres driver - query by time range": check: filteredMessages.len == 0 - ## Cleanup - (await driver.close()).expect("driver to close") - -suite "Postgres driver - retention policy": - asyncTest "Get oldest and newest message timestamp": const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let oldestTime = ts(00, timeOrigin) let newestTime = ts(100, timeOrigin) @@ -1337,14 +1201,9 @@ suite "Postgres driver - retention policy": assert res.isOk(), res.error assert res.get() == newestTime, "Failed to retrieve the newest timestamp" - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "Delete messages older than certain timestamp": const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let targetTime = ts(40, timeOrigin) let expected = @[ @@ -1375,14 +1234,9 @@ suite "Postgres driver - retention policy": assert res.isOk(), res.error assert res.get() == 3, "Failed to retrieve the # of messages after deletion" - ## Cleanup - (await driver.close()).expect("driver to close") - asyncTest "Keep last n messages": const contentTopic = "test-content-topic" - let driver = newTestPostgresDriver() - let timeOrigin = now() let expected = @[ fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)), @@ -1412,5 +1266,9 @@ suite "Postgres driver - retention policy": assert res.isOk(), res.error assert res.get() == 2, "Failed to retrieve the # of messages after deletion" - ## Cleanup - (await driver.close()).expect("driver to close") + asyncTest "Exists table": + + var existsRes = await driver.existsTable("version") + assert existsRes.isOk(), existsRes.error + check existsRes.get() == true + diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 71fcd009d..4ec65425f 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -43,7 +43,7 @@ suite "Wakunode2 - App initialization": let res = wakunode2.setupPeerPersistence() ## Then - check res.isOk() + assert res.isOk(), $res.error test "node setup is successful with default configuration": ## Given diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index fb3826663..fccb14bd1 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -79,3 +79,6 @@ method decreaseDatabaseSize*(driver: ArchiveDriver, method close*(driver: ArchiveDriver): Future[ArchiveDriverResult[void]] {.base, async.} = discard +method existsTable*(driver: ArchiveDriver, tableName: string): + Future[ArchiveDriverResult[bool]] {.base, async.} = discard + diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index e853b63a6..c3f399c6e 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -15,6 +15,7 @@ import ../../common/error_handling, ./sqlite_driver, ./sqlite_driver/migrations as archive_driver_sqlite_migrations, + ./postgres_driver/migrations as archive_postgres_driver_migrations, ./queue_driver export @@ -31,7 +32,7 @@ proc new*(T: type ArchiveDriver, migrate: bool, maxNumConn: int, onFatalErrorAction: OnFatalErrorHandler): - Result[T, string] = + Future[Result[T, string]] {.async.} = ## url - string that defines the database ## vacuum - if true, a cleanup operation will be applied to the database ## migrate - if true, the database schema will be updated @@ -63,17 +64,25 @@ proc new*(T: type ArchiveDriver, let db = dbRes.get() # SQLite vacuum - let (pageSize, pageCount, freelistCount) = ? db.gatherSqlitePageStats() + let sqliteStatsRes = db.gatherSqlitePageStats() + if sqliteStatsRes.isErr(): + return err("error while gathering sqlite stats: " & $sqliteStatsRes.error) + + let (pageSize, pageCount, freelistCount) = sqliteStatsRes.get() debug "sqlite database page stats", pageSize = pageSize, pages = pageCount, freePages = freelistCount if vacuum and (pageCount > 0 and freelistCount > 0): - ? db.performSqliteVacuum() + let vacuumRes = db.performSqliteVacuum() + if vacuumRes.isErr(): + return err("error in vacuum sqlite: " & $vacuumRes.error) # Database migration if migrate: - ? archive_driver_sqlite_migrations.migrate(db) + let migrateRes = archive_driver_sqlite_migrations.migrate(db) + if migrateRes.isErr(): + return err("error in migrate sqlite: " & $migrateRes.error) debug "setting up sqlite waku archive driver" let res = SqliteDriver.new(db) @@ -92,13 +101,11 @@ proc new*(T: type ArchiveDriver, let driver = res.get() - try: - # The table should exist beforehand. - let newTableRes = waitFor driver.createMessageTable() - if newTableRes.isErr(): - return err("error creating table: " & newTableRes.error) - except CatchableError: - return err("exception creating table: " & getCurrentExceptionMsg()) + # Database migration + if migrate: + let migrateRes = await archive_postgres_driver_migrations.migrate(driver) + if migrateRes.isErr(): + return err("ArchiveDriver build failed in migration: " & $migrateRes.error) return ok(driver) diff --git a/waku/waku_archive/driver/postgres_driver/migrations.nim b/waku/waku_archive/driver/postgres_driver/migrations.nim new file mode 100644 index 000000000..74cfb4530 --- /dev/null +++ b/waku/waku_archive/driver/postgres_driver/migrations.nim @@ -0,0 +1,95 @@ +{.push raises: [].} + +import + std/[tables, strutils, os], + stew/results, + chronicles, + chronos +import + ../../../common/databases/common, + ../../../../migrations/message_store_postgres/pg_migration_manager, + ../postgres_driver + +logScope: + topics = "waku archive migration" + +const SchemaVersion* = 1 # increase this when there is an update in the database schema + +proc breakIntoStatements*(script: string): seq[string] = + ## Given a full migration script, that can potentially contain a list + ## of SQL statements, this proc splits it into the contained isolated statements + ## that should be executed one after the other. + var statements = newSeq[string]() + + let lines = script.split('\n') + + var simpleStmt: string + var plSqlStatement: string + var insidePlSqlScript = false + for line in lines: + if line.strip().len == 0: + continue + + if insidePlSqlScript: + if line.contains("END $$"): + ## End of the Pl/SQL script + plSqlStatement &= line + statements.add(plSqlStatement) + plSqlStatement = "" + insidePlSqlScript = false + continue + + else: + plSqlStatement &= line & "\n" + + if line.contains("DO $$"): + ## Beginning of the Pl/SQL script + insidePlSqlScript = true + plSqlStatement &= line & "\n" + + if not insidePlSqlScript: + if line.contains(';'): + ## End of simple statement + simpleStmt &= line + statements.add(simpleStmt) + simpleStmt = "" + else: + simpleStmt &= line & "\n" + + return statements + +proc migrate*(driver: PostgresDriver, + targetVersion = SchemaVersion): + Future[DatabaseResult[void]] {.async.} = + + debug "starting message store's postgres database migration" + + let currentVersion = (await driver.getCurrentVersion()).valueOr: + return err("migrate error could not retrieve current version: " & $error) + + if currentVersion == targetVersion: + debug "database schema is up to date", + currentVersion=currentVersion, targetVersion=targetVersion + return ok() + + info "database schema is outdated", currentVersion=currentVersion, targetVersion=targetVersion + + # Load migration scripts + let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion) + + # Run the migration scripts + for script in scripts: + + for statement in script.breakIntoStatements(): + debug "executing migration statement", statement=statement + + (await driver.performWriteQuery(statement)).isOkOr: + error "failed to execute migration statement", statement=statement, error=error + return err("failed to execute migration statement") + + debug "migration statement executed succesfully", statement=statement + + debug "finished message store's postgres database migration" + + return ok() + diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 443eb0f38..71efa491d 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[nre,options,sequtils,strutils,times], + std/[nre,options,sequtils,strutils,times,strformat], stew/[results,byteutils], db_postgres, postgres, @@ -28,6 +28,9 @@ type PostgresDriver* = ref object of ArchiveDriver proc dropTableQuery(): string = "DROP TABLE messages" +proc dropVersionTableQuery(): string = + "DROP TABLE version" + proc createTableQuery(): string = "CREATE TABLE IF NOT EXISTS messages (" & " pubsubTopic VARCHAR NOT NULL," & @@ -111,6 +114,15 @@ proc new*(T: type PostgresDriver, return ok(PostgresDriver(writeConnPool: writeConnPool, readConnPool: readConnPool)) +proc performWriteQuery*(s: PostgresDriver, + query: string): Future[ArchiveDriverResult[void]] {.async.} = + ## Executes a query that changes the database state + ## TODO: we can reduce the code a little with this proc + (await s.writeConnPool.pgQuery(query)).isOkOr: + return err(fmt"error in {query}: {error}") + + return ok() + proc createMessageTable*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = @@ -120,7 +132,7 @@ proc createMessageTable*(s: PostgresDriver): return ok() -proc deleteMessageTable*(s: PostgresDriver): +proc deleteMessageTable(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = let execRes = await s.writeConnPool.pgQuery(dropTableQuery()) @@ -129,18 +141,22 @@ proc deleteMessageTable*(s: PostgresDriver): return ok() -proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = +proc deleteVersionTable(s: PostgresDriver): + Future[ArchiveDriverResult[void]] {.async.} = - let createMsgRes = await s.createMessageTable() - if createMsgRes.isErr(): - return err("createMsgRes.isErr in init: " & createMsgRes.error) + let execRes = await s.writeConnPool.pgQuery(dropVersionTableQuery()) + if execRes.isErr(): + return err("error in deleteVersionTable: " & execRes.error) return ok() proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - - let ret = await s.deleteMessageTable() - return ret + ## This is only used for testing purposes, to set a fresh database at the beginning of each test + (await s.deleteMessageTable()).isOkOr: + return err("error deleting message table: " & $error) + (await s.deleteVersionTable()).isOkOr: + return err("error deleting version table: " & $error) + return ok() proc rowCallbackImpl(pqResult: ptr PGresult, outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) = @@ -570,3 +586,45 @@ proc sleep*(s: PostgresDriver, seconds: int): return err("exception sleeping: " & getCurrentExceptionMsg()) return ok() + +method existsTable*(s: PostgresDriver, tableName: string): + Future[ArchiveDriverResult[bool]] {.async.} = + let query: string = fmt""" + SELECT EXISTS ( + SELECT FROM + pg_tables + WHERE + tablename = '{tableName}' + ); + """ + + var exists: string + proc rowCallback(pqResult: ptr PGresult) = + if pqResult.pqnfields() != 1: + error "Wrong number of fields in existsTable" + return + + if pqResult.pqNtuples() != 1: + error "Wrong number of rows in existsTable" + return + + exists = $(pqgetvalue(pqResult, 0, 0)) + + (await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr: + return err("existsTable failed in getRow: " & $error) + + return ok(exists == "t") + +proc getCurrentVersion*(s: PostgresDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + + let existsVersionTable = (await s.existsTable("version")).valueOr: + return err("error in getCurrentVersion-existsTable: " & $error) + + if not existsVersionTable: + return ok(0) + + let res = (await s.getInt(fmt"SELECT version FROM version")).valueOr: + return err("error in getMessagesCount: " & $error) + + return ok(res) diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 2db8a3d84..27d9ae963 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -240,6 +240,10 @@ method getAllMessages*(driver: QueueDriver): # TODO: Implement this message_store method return err("interface method not implemented") +method existsTable*(driver: QueueDriver, tableName: string): + Future[ArchiveDriverResult[bool]] {.async.} = + return err("interface method not implemented") + method getMessages*(driver: QueueDriver, contentTopic: seq[ContentTopic] = @[], pubsubTopic = none(PubsubTopic), diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 88d1974ef..5a74928c3 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -189,3 +189,7 @@ method close*(s: SqliteDriver): s.db.close() return ok() +method existsTable*(s: SqliteDriver, tableName: string): + Future[ArchiveDriverResult[bool]] {.async.} = + return err("existsTable method not implemented in sqlite_driver") +