diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim index 347cddaa3..3ecf88fa1 100644 --- a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -70,6 +70,9 @@ proc addPartitionInfo*( trace "Adding partition info" self.partitions.addLast(partitionInfo) +proc clearPartitionInfo*(self: PartitionManager) = + self.partitions.clear() + proc removeOldestPartitionName*(self: PartitionManager) = ## Simply removed the partition from the tracked/known partitions queue. ## Just remove it and ignore it. diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 00bee087b..516d8d70e 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -866,6 +866,11 @@ proc acquireDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + ## This should only be used in the migrations module because this approach doesn't ensure + ## that the lock is acquired/released by the same connection. The preferable "lock" + ## approach is using the "performWriteQueryWithLock" proc. However, we can't use + ## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL + ## scripts. let locked = ( await s.getStr( fmt""" @@ -908,6 +913,46 @@ proc performWriteQuery*( return ok() +const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" + +proc performWriteQueryWithLock*( + self: PostgresDriver, queryToProtect: string +): Future[ArchiveDriverResult[void]] {.async.} = + ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it + debug "performWriteQueryWithLock", queryToProtect + let query = + fmt""" + DO $$ + DECLARE + lock_acquired boolean; + BEGIN + -- Try to acquire the advisory lock + lock_acquired := pg_try_advisory_lock(123456789); + + IF NOT lock_acquired THEN + RAISE EXCEPTION '{COULD_NOT_ACQUIRE_ADVISORY_LOCK}'; + END IF; + + -- Perform the query + BEGIN + {queryToProtect} + EXCEPTION WHEN OTHERS THEN + -- Ensure the lock is released if an error occurs + PERFORM pg_advisory_unlock(123456789); + RAISE; + END; + + -- Release the advisory lock after the query completes successfully + PERFORM pg_advisory_unlock(123456789); + END $$; +""" + (await self.performWriteQuery(query)).isOkOr: + debug "protected query ended with error", error = $error + return err("protected query ended with error:" & $error) + + debug "protected query ended correctly" + return ok() + proc addPartition( self: PostgresDriver, startTime: Timestamp ): Future[ArchiveDriverResult[void]] {.async.} = @@ -930,21 +975,15 @@ proc addPartition( "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" - # Lock the db - (await self.acquireDatabaseLock()).isOkOr: - error "failed to acquire lock", error = error - return err("failed to lock the db") - - defer: - (await self.releaseDatabaseLock()).isOkOr: - error "failed to release lock", error = error - return err("failed to unlock the db.") - - (await self.performWriteQuery(createPartitionQuery)).isOkOr: + (await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error return ok() + if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): + debug "skip create new partition because the advisory lock is acquired by other" + return ok() + ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) @@ -953,9 +992,12 @@ proc addPartition( self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) return ok() -proc initializePartitionsInfo( +proc refreshPartitionsInfo( self: PostgresDriver ): Future[ArchiveDriverResult[void]] {.async.} = + debug "refreshPartitionsInfo" + self.partitionMngr.clearPartitionInfo() + let partitionNamesRes = await self.getPartitionsList() if not partitionNamesRes.isOk(): return err("Could not retrieve partitions list: " & $partitionNamesRes.error) @@ -994,13 +1036,13 @@ proc loopPartitionFactory( debug "starting loopPartitionFactory" - ## First of all, let's make the 'partition_manager' aware of the current partitions - (await self.initializePartitionsInfo()).isOkOr: - onFatalError("issue in loopPartitionFactory: " & $error) - while true: trace "Check if we need to create a new partition" + ## Let's make the 'partition_manager' aware of the current partitions + (await self.refreshPartitionsInfo()).isOkOr: + onFatalError("issue in loopPartitionFactory: " & $error) + let now = times.now().toTime().toUnix() if self.partitionMngr.isEmpty():