From 88b8e1867a238cd83eaa34b714269f77202e4106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Fri, 31 May 2024 12:08:16 -0400 Subject: [PATCH] fix(waku_archive): only allow a single instance to execute migrations (#2736) --- .../driver/postgres_driver/migrations.nim | 10 ++++++ .../postgres_driver/postgres_driver.nim | 36 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/waku/waku_archive/driver/postgres_driver/migrations.nim b/waku/waku_archive/driver/postgres_driver/migrations.nim index 254decd98..1a52eeabc 100644 --- a/waku/waku_archive/driver/postgres_driver/migrations.nim +++ b/waku/waku_archive/driver/postgres_driver/migrations.nim @@ -72,6 +72,16 @@ proc migrate*( # Load migration scripts let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion) + # Lock the db + (await driver.acquireDatabaseLock()).isOkOr: + error "failed to acquire lock", error = error + return err("failed to lock the db") + + defer: + (await driver.releaseDatabaseLock()).isOkOr: + error "failed to release lock", error = error + return err("failed to unlock the db.") + # Run the migration scripts for script in scripts: for statement in script.breakIntoStatements(): diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 99196f5c0..4b646bb72 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -853,6 +853,42 @@ proc sleep*( return ok() +proc acquireDatabaseLock*( + s: PostgresDriver, lockId: int = 841886 +): Future[ArchiveDriverResult[void]] {.async.} = + ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + let locked = ( + await s.getStr( + fmt""" + SELECT pg_try_advisory_lock({lockId}) + """ + ) + ).valueOr: + return err("error acquiring a lock: " & error) + + if locked == "f": + return err("another waku instance is currently executing a migration") + + return ok() + +proc releaseDatabaseLock*( + s: PostgresDriver, lockId: int = 841886 +): Future[ArchiveDriverResult[void]] {.async.} = + ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + let unlocked = ( + await s.getStr( + fmt""" + SELECT pg_advisory_unlock({lockId}) + """ + ) + ).valueOr: + return err("error releasing a lock: " & error) + + if unlocked == "f": + return err("could not release advisory lock") + + return ok() + proc performWriteQuery*( s: PostgresDriver, query: string ): Future[ArchiveDriverResult[void]] {.async.} =