From 9576b77c7622873699f8d6d6be83df1a72ba4d40 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:41:29 +0200 Subject: [PATCH] postgres_driver: better partition creation without exclusive access (#2887) --- .../postgres_driver/postgres_driver.nim | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index ce7135b4e..48e06409a 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -915,7 +915,9 @@ const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" proc performWriteQueryWithLock*( self: PostgresDriver, queryToProtect: string ): Future[ArchiveDriverResult[void]] {.async.} = - ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it + ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it. + ## The purpose of this proc is to protect write queries that might be performed simultaneously + ## to the same database, from different store nodes. debug "performWriteQueryWithLock", queryToProtect let query = fmt""" @@ -944,6 +946,11 @@ proc performWriteQueryWithLock*( END $$; """ (await self.performWriteQuery(query)).isOkOr: + if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): + ## We don't consider this as an error. Just someone else acquired the advisory lock + debug "skip performWriteQuery because the advisory lock is acquired by other" + return ok() + debug "protected query ended with error", error = $error return err("protected query ended with error:" & $error) @@ -968,22 +975,44 @@ proc addPartition( let partitionName = "messages_" & fromInSec & "_" & untilInSec + ## Create the partition table but not attach it yet to the main table let createPartitionQuery = - "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & - "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" + "CREATE TABLE IF NOT EXISTS " & partitionName & + " (LIKE messages INCLUDING DEFAULTS INCLUDING CONSTRAINTS);" (await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error return ok() - if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): - debug "skip create new partition because the advisory lock is acquired by other" - return ok() - - ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) + ## Add constraint to the partition table so that EXCLUSIVE ACCESS is not performed when + ## the partition is attached to the main table. + let constraintName = partitionName & "_by_range_check" + let addTimeConstraintQuery = + "ALTER TABLE " & partitionName & " ADD CONSTRAINT " & constraintName & + " CHECK ( storedAt >= " & fromInNanoSec & " AND storedAt < " & untilInNanoSec & " );" + + (await self.performWriteQueryWithLock(addTimeConstraintQuery)).isOkOr: + return err(fmt"error creating constraint [{partitionName}]: " & $error) + + ## Attaching the new created table as a new partition. That does not require EXCLUSIVE ACCESS. + let attachPartitionQuery = + "ALTER TABLE messages ATTACH PARTITION " & partitionName & " FOR VALUES FROM (" & + fromInNanoSec & ") TO (" & untilInNanoSec & ");" + + (await self.performWriteQueryWithLock(attachPartitionQuery)).isOkOr: + return err(fmt"error attaching partition [{partitionName}]: " & $error) + + ## Dropping the check constraint as it was only necessary to prevent full scan, + ## and EXCLUSIVE ACCESS, to the whole messages table, when the new partition was attached. + let dropConstraint = + "ALTER TABLE " & partitionName & " DROP CONSTRAINT " & constraintName & ";" + + (await self.performWriteQueryWithLock(dropConstraint)).isOkOr: + return err(fmt"error dropping constraint [{partitionName}]: " & $error) + debug "new partition added", query = createPartitionQuery self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)