chore: Postgres migrations (#2477)

* Add postgres_driver/migrations.nim
* Postgres and archive logic adaptation to the migration implementation
* libwaku: adapt node_lifecycle_request.nim to migration refactoring
* test_app.nim: add more detail for test that only fails in CI
* postgres migrations: store the migration scripts inside the resulting wakunode binary instead of external .sql files.
This commit is contained in:
Ivan FB 2024-03-01 12:05:27 +01:00 committed by GitHub
parent 88ff928213
commit 560f949a8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 402 additions and 255 deletions

View File

@ -493,7 +493,7 @@ proc setupProtocols(node: WakuNode,
if conf.store:
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,

View File

@ -70,11 +70,11 @@ proc configureStore(node: WakuNode,
discard
# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
let archiveDriverRes = await ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -0,0 +1,20 @@
const ContentScriptVersion_1* = """
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
);
CREATE TABLE iF NOT EXISTS version (
version INTEGER NOT NULL
);
INSERT INTO version (version) VALUES(1);
"""

View File

@ -0,0 +1,68 @@
const ContentScriptVersion_2* = """
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 2 WHERE version = 1;
"""

View File

@ -0,0 +1,37 @@
import
content_script_version_1,
content_script_version_2
type
MigrationScript* = object
version*: int
scriptContent*: string
proc init*(T: type MigrationScript,
targetVersion: int,
scriptContent: string): T =
return MigrationScript(
targetVersion: targetVersion,
scriptContent: scriptContent)
const PgMigrationScripts* = @[
MigrationScript(
version: 1,
scriptContent: ContentScriptVersion_1),
MigrationScript(
version: 2,
scriptContent: ContentScriptVersion_2)
]
proc getMigrationScripts*(currentVersion: int64,
targetVersion: int64): seq[string] =
var ret = newSeq[string]()
var v = currentVersion
while v < targetVersion:
ret.add(PgMigrationScripts[v].scriptContent)
v.inc()
return ret

View File

@ -0,0 +1,33 @@
import
chronicles,
chronos
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.async.} =
proc onErr(errMsg: string) {.gcsafe, closure.} =
error "error creating ArchiveDriver", error = errMsg
quit(QuitFailure)
let
vacuum = false
migrate = true
maxNumConn = 50
let driverRes = await ArchiveDriver.new(storeMessageDbUrl,
vacuum,
migrate,
maxNumConn,
onErr)
if driverRes.isErr():
onErr("could not create archive driver: " & driverRes.error)
return ok(driverRes.get())

View File

@ -9,7 +9,9 @@ import
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/wakucore
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres
proc now():int64 = getTime().toUnix()
@ -24,18 +26,24 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
)
suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
asyncSetup:
let driverRes = await newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
driver = PostgresDriver(driverRes.get())
asyncTeardown:
let resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await driver.close()).expect("driver to close")
asyncTest "Asynchronous queries":
let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl,
maxConnections = 100)
assert driverRes.isOk(), driverRes.error
let driver = driverRes.value
discard await driver.reset()
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)
let beforeSleep = now()
@ -50,33 +58,9 @@ suite "Postgres driver":
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20
(await driver.close()).expect("driver to close")
asyncTest "Init database":
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error
let driver = driverRes.value
discard await driver.reset()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error
(await driver.close()).expect("driver to close")
asyncTest "Insert a message":
const contentTopic = "test-content-topic"
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error
let driver = driverRes.get()
discard await driver.reset()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error
let msg = fakeWakuMessage(contentTopic=contentTopic)
let computedDigest = computeDigest(msg)
@ -94,24 +78,12 @@ suite "Postgres driver":
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)
(await driver.close()).expect("driver to close")
asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const pubsubTopic1 = "pubsubtopic-1"
const pubsubTopic2 = "pubsubtopic-2"
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error
let driver = driverRes.value
discard await driver.reset()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error
let msg1 = fakeWakuMessage(contentTopic=contentTopic1)
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
@ -178,19 +150,8 @@ suite "Postgres driver":
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1
(await driver.close()).expect("driver to close")
asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error
let driver = driverRes.value
discard await driver.reset()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error
let now = now()
@ -205,4 +166,3 @@ suite "Postgres driver":
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()
(await driver.close()).expect("driver to close")

View File

@ -7,11 +7,15 @@ import
chronicles
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/common,
../testlib/wakucore
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres
logScope:
@ -24,17 +28,6 @@ logScope:
# Initialize the random number generator
common.randomize()
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
proc newTestPostgresDriver(): ArchiveDriver =
let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet()
discard waitFor driver.reset()
let initRes = waitFor driver.init()
assert initRes.isOk(), initRes.error
return driver
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
@ -43,14 +36,28 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
digest: computeDigest(message)
)
suite "Postgres driver - query by content topic":
suite "Postgres driver - queries":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
asyncSetup:
let driverRes = await newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
driver = PostgresDriver(driverRes.get())
asyncTeardown:
let resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await driver.close()).expect("driver to close")
asyncTest "no content topic":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)),
fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)),
@ -83,15 +90,10 @@ suite "Postgres driver - query by content topic":
check:
filteredMessages == expected[0..4]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "single content topic":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -126,15 +128,10 @@ suite "Postgres driver - query by content topic":
check:
filteredMessages == expected[2..3]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -169,17 +166,12 @@ suite "Postgres driver - query by content topic":
check:
filteredMessages == expected[6..7].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "multiple content topic":
## Given
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const contentTopic3 = "test-content-topic-3"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -232,15 +224,10 @@ suite "Postgres driver - query by content topic":
filteredMessages = res.tryGet().mapIt(it[1])
check filteredMessages == @[expected[2]]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "single content topic - no results":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=DefaultContentTopic, ts=ts(00)),
fakeWakuMessage(@[byte 1], contentTopic=DefaultContentTopic, ts=ts(10)),
@ -270,15 +257,10 @@ suite "Postgres driver - query by content topic":
check:
filteredMessages.len == 0
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "content topic and max page size - not enough messages stored":
## Given
const pageSize: uint = 50
let driver = newTestPostgresDriver()
for t in 0..<40:
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
@ -297,18 +279,11 @@ suite "Postgres driver - query by content topic":
check:
filteredMessages.len == 40
## Cleanup
(await driver.close()).expect("driver to close")
suite "Postgres driver - query by pubsub topic":
asyncTest "pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))),
(DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))),
@ -344,16 +319,11 @@ suite "Postgres driver - query by pubsub topic":
check:
filteredMessages == expectedMessages[4..5]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "no pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))),
(DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))),
@ -388,16 +358,11 @@ suite "Postgres driver - query by pubsub topic":
check:
filteredMessages == expectedMessages[0..1]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "content topic and pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00))),
(DefaultPubsubTopic, fakeWakuMessage(@[byte 1], ts=ts(10))),
@ -435,17 +400,10 @@ suite "Postgres driver - query by pubsub topic":
check:
filteredMessages == expectedMessages[4..5]
## Cleanup
(await driver.close()).expect("driver to close")
suite "Postgres driver - query by cursor":
asyncTest "only cursor":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -482,15 +440,10 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expected[5..6]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "only cursor - descending order":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -527,15 +480,10 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expected[2..3].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -571,15 +519,10 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expected[5..6]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "content topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00)),
fakeWakuMessage(@[byte 1], ts=ts(10)),
@ -615,16 +558,11 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expected[2..5].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "pubsub topic and cursor":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -667,16 +605,11 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expectedMessages[6..7]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "pubsub topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -719,17 +652,10 @@ suite "Postgres driver - query by cursor":
check:
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
suite "Postgres driver - query by time range":
asyncTest "start time only":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -763,15 +689,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[2..6]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "end time only":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -805,16 +726,11 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[0..4]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "start time and end time":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -855,15 +771,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expectedMessages[2..4]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "invalid time range - no results":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -899,15 +810,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages.len == 0
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range start and content topic":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -941,15 +847,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[2..6]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range start and content topic - descending order":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -986,15 +887,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[2..6].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range start, single content topic and cursor":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -1034,15 +930,10 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[4..9]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range start, single content topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -1082,16 +973,11 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expected[3..4].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range, content topic, pubsub topic and cursor":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
# start_time
@ -1136,16 +1022,11 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expectedMessages[3..4]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range, content topic, pubsub topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -1190,16 +1071,11 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -1245,16 +1121,11 @@ suite "Postgres driver - query by time range":
check:
filteredMessages == expectedMessages[4..5]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
(DefaultPubsubTopic, fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin))),
@ -1299,16 +1170,9 @@ suite "Postgres driver - query by time range":
check:
filteredMessages.len == 0
## Cleanup
(await driver.close()).expect("driver to close")
suite "Postgres driver - retention policy":
asyncTest "Get oldest and newest message timestamp":
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let oldestTime = ts(00, timeOrigin)
let newestTime = ts(100, timeOrigin)
@ -1337,14 +1201,9 @@ suite "Postgres driver - retention policy":
assert res.isOk(), res.error
assert res.get() == newestTime, "Failed to retrieve the newest timestamp"
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "Delete messages older than certain timestamp":
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let targetTime = ts(40, timeOrigin)
let expected = @[
@ -1375,14 +1234,9 @@ suite "Postgres driver - retention policy":
assert res.isOk(), res.error
assert res.get() == 3, "Failed to retrieve the # of messages after deletion"
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "Keep last n messages":
const contentTopic = "test-content-topic"
let driver = newTestPostgresDriver()
let timeOrigin = now()
let expected = @[
fakeWakuMessage(@[byte 0], contentTopic=contentTopic, ts=ts(00, timeOrigin)),
@ -1412,5 +1266,9 @@ suite "Postgres driver - retention policy":
assert res.isOk(), res.error
assert res.get() == 2, "Failed to retrieve the # of messages after deletion"
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "Exists table":
var existsRes = await driver.existsTable("version")
assert existsRes.isOk(), existsRes.error
check existsRes.get() == true

View File

@ -43,7 +43,7 @@ suite "Wakunode2 - App initialization":
let res = wakunode2.setupPeerPersistence()
## Then
check res.isOk()
assert res.isOk(), $res.error
test "node setup is successful with default configuration":
## Given

View File

@ -79,3 +79,6 @@ method decreaseDatabaseSize*(driver: ArchiveDriver,
method close*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard
method existsTable*(driver: ArchiveDriver, tableName: string):
Future[ArchiveDriverResult[bool]] {.base, async.} = discard

View File

@ -15,6 +15,7 @@ import
../../common/error_handling,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./postgres_driver/migrations as archive_postgres_driver_migrations,
./queue_driver
export
@ -31,7 +32,7 @@ proc new*(T: type ArchiveDriver,
migrate: bool,
maxNumConn: int,
onFatalErrorAction: OnFatalErrorHandler):
Result[T, string] =
Future[Result[T, string]] {.async.} =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
@ -63,17 +64,25 @@ proc new*(T: type ArchiveDriver,
let db = dbRes.get()
# SQLite vacuum
let (pageSize, pageCount, freelistCount) = ? db.gatherSqlitePageStats()
let sqliteStatsRes = db.gatherSqlitePageStats()
if sqliteStatsRes.isErr():
return err("error while gathering sqlite stats: " & $sqliteStatsRes.error)
let (pageSize, pageCount, freelistCount) = sqliteStatsRes.get()
debug "sqlite database page stats", pageSize = pageSize,
pages = pageCount,
freePages = freelistCount
if vacuum and (pageCount > 0 and freelistCount > 0):
? db.performSqliteVacuum()
let vacuumRes = db.performSqliteVacuum()
if vacuumRes.isErr():
return err("error in vacuum sqlite: " & $vacuumRes.error)
# Database migration
if migrate:
? archive_driver_sqlite_migrations.migrate(db)
let migrateRes = archive_driver_sqlite_migrations.migrate(db)
if migrateRes.isErr():
return err("error in migrate sqlite: " & $migrateRes.error)
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db)
@ -92,13 +101,11 @@ proc new*(T: type ArchiveDriver,
let driver = res.get()
try:
# The table should exist beforehand.
let newTableRes = waitFor driver.createMessageTable()
if newTableRes.isErr():
return err("error creating table: " & newTableRes.error)
except CatchableError:
return err("exception creating table: " & getCurrentExceptionMsg())
# Database migration
if migrate:
let migrateRes = await archive_postgres_driver_migrations.migrate(driver)
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)
return ok(driver)

View File

@ -0,0 +1,95 @@
{.push raises: [].}
import
std/[tables, strutils, os],
stew/results,
chronicles,
chronos
import
../../../common/databases/common,
../../../../migrations/message_store_postgres/pg_migration_manager,
../postgres_driver
logScope:
topics = "waku archive migration"
const SchemaVersion* = 1 # increase this when there is an update in the database schema
proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
## of SQL statements, this proc splits it into the contained isolated statements
## that should be executed one after the other.
var statements = newSeq[string]()
let lines = script.split('\n')
var simpleStmt: string
var plSqlStatement: string
var insidePlSqlScript = false
for line in lines:
if line.strip().len == 0:
continue
if insidePlSqlScript:
if line.contains("END $$"):
## End of the Pl/SQL script
plSqlStatement &= line
statements.add(plSqlStatement)
plSqlStatement = ""
insidePlSqlScript = false
continue
else:
plSqlStatement &= line & "\n"
if line.contains("DO $$"):
## Beginning of the Pl/SQL script
insidePlSqlScript = true
plSqlStatement &= line & "\n"
if not insidePlSqlScript:
if line.contains(';'):
## End of simple statement
simpleStmt &= line
statements.add(simpleStmt)
simpleStmt = ""
else:
simpleStmt &= line & "\n"
return statements
proc migrate*(driver: PostgresDriver,
targetVersion = SchemaVersion):
Future[DatabaseResult[void]] {.async.} =
debug "starting message store's postgres database migration"
let currentVersion = (await driver.getCurrentVersion()).valueOr:
return err("migrate error could not retrieve current version: " & $error)
if currentVersion == targetVersion:
debug "database schema is up to date",
currentVersion=currentVersion, targetVersion=targetVersion
return ok()
info "database schema is outdated", currentVersion=currentVersion, targetVersion=targetVersion
# Load migration scripts
let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion)
# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
debug "executing migration statement", statement=statement
(await driver.performWriteQuery(statement)).isOkOr:
error "failed to execute migration statement", statement=statement, error=error
return err("failed to execute migration statement")
debug "migration statement executed succesfully", statement=statement
debug "finished message store's postgres database migration"
return ok()

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[nre,options,sequtils,strutils,times],
std/[nre,options,sequtils,strutils,times,strformat],
stew/[results,byteutils],
db_postgres,
postgres,
@ -28,6 +28,9 @@ type PostgresDriver* = ref object of ArchiveDriver
proc dropTableQuery(): string =
"DROP TABLE messages"
proc dropVersionTableQuery(): string =
"DROP TABLE version"
proc createTableQuery(): string =
"CREATE TABLE IF NOT EXISTS messages (" &
" pubsubTopic VARCHAR NOT NULL," &
@ -111,6 +114,15 @@ proc new*(T: type PostgresDriver,
return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))
proc performWriteQuery*(s: PostgresDriver,
query: string): Future[ArchiveDriverResult[void]] {.async.} =
## Executes a query that changes the database state
## TODO: we can reduce the code a little with this proc
(await s.writeConnPool.pgQuery(query)).isOkOr:
return err(fmt"error in {query}: {error}")
return ok()
proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
@ -120,7 +132,7 @@ proc createMessageTable*(s: PostgresDriver):
return ok()
proc deleteMessageTable*(s: PostgresDriver):
proc deleteMessageTable(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.pgQuery(dropTableQuery())
@ -129,18 +141,22 @@ proc deleteMessageTable*(s: PostgresDriver):
return ok()
proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
proc deleteVersionTable(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let createMsgRes = await s.createMessageTable()
if createMsgRes.isErr():
return err("createMsgRes.isErr in init: " & createMsgRes.error)
let execRes = await s.writeConnPool.pgQuery(dropVersionTableQuery())
if execRes.isErr():
return err("error in deleteVersionTable: " & execRes.error)
return ok()
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.deleteMessageTable()
return ret
## This is only used for testing purposes, to set a fresh database at the beginning of each test
(await s.deleteMessageTable()).isOkOr:
return err("error deleting message table: " & $error)
(await s.deleteVersionTable()).isOkOr:
return err("error deleting version table: " & $error)
return ok()
proc rowCallbackImpl(pqResult: ptr PGresult,
outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) =
@ -570,3 +586,45 @@ proc sleep*(s: PostgresDriver, seconds: int):
return err("exception sleeping: " & getCurrentExceptionMsg())
return ok()
method existsTable*(s: PostgresDriver, tableName: string):
Future[ArchiveDriverResult[bool]] {.async.} =
let query: string = fmt"""
SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
tablename = '{tableName}'
);
"""
var exists: string
proc rowCallback(pqResult: ptr PGresult) =
if pqResult.pqnfields() != 1:
error "Wrong number of fields in existsTable"
return
if pqResult.pqNtuples() != 1:
error "Wrong number of rows in existsTable"
return
exists = $(pqgetvalue(pqResult, 0, 0))
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
return err("existsTable failed in getRow: " & $error)
return ok(exists == "t")
proc getCurrentVersion*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
let existsVersionTable = (await s.existsTable("version")).valueOr:
return err("error in getCurrentVersion-existsTable: " & $error)
if not existsVersionTable:
return ok(0)
let res = (await s.getInt(fmt"SELECT version FROM version")).valueOr:
return err("error in getMessagesCount: " & $error)
return ok(res)

View File

@ -240,6 +240,10 @@ method getAllMessages*(driver: QueueDriver):
# TODO: Implement this message_store method
return err("interface method not implemented")
method existsTable*(driver: QueueDriver, tableName: string):
Future[ArchiveDriverResult[bool]] {.async.} =
return err("interface method not implemented")
method getMessages*(driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),

View File

@ -189,3 +189,7 @@ method close*(s: SqliteDriver):
s.db.close()
return ok()
method existsTable*(s: SqliteDriver, tableName: string):
Future[ArchiveDriverResult[bool]] {.async.} =
return err("existsTable method not implemented in sqlite_driver")