From b46226fca0fbc5730cbf3c9899e33fac8b817796 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 7 May 2024 23:42:01 +0200 Subject: [PATCH] postgres_driver: delete partitions in time retention policy (#2679) --- .../postgres_driver/postgres_driver.nim | 97 ++++++++++++------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index ee492fa7b..8bfc0187f 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -741,16 +741,6 @@ method getNewestMessageTimestamp*( return ok(Timestamp(intRes.get())) -method deleteMessagesOlderThanTimestamp*( - s: PostgresDriver, ts: Timestamp -): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = - await s.writeConnPool.pgQuery("DELETE FROM messages WHERE storedAt < " & $ts) - if execRes.isErr(): - return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error) - - return ok() - method deleteOldestMessagesNotWithinLimit*( s: PostgresDriver, limit: int ): Future[ArchiveDriverResult[void]] {.async.} = @@ -953,6 +943,54 @@ proc getTableSize*( return ok(tableSize) +proc removePartition( + self: PostgresDriver, partitionName: string +): Future[ArchiveDriverResult[void]] {.async.} = + var partSize = "" + let partSizeRes = await self.getTableSize(partitionName) + if partSizeRes.isOk(): + partSize = partSizeRes.get() + + ## Detach and remove the partition concurrently to not block the parent table (messages) + let detachPartitionQuery = + "ALTER TABLE messages DETACH PARTITION " & partitionName & " CONCURRENTLY;" + debug "removeOldestPartition", query = detachPartitionQuery + (await self.performWriteQuery(detachPartitionQuery)).isOkOr: + return err(fmt"error in {detachPartitionQuery}: " & $error) + + ## Drop the partition + let dropPartitionQuery = "DROP TABLE " & partitionName + debug "removeOldestPartition drop partition", query = dropPartitionQuery + (await self.performWriteQuery(dropPartitionQuery)).isOkOr: + return err(fmt"error in {dropPartitionQuery}: " & $error) + + debug "removed partition", partition_name = partitionName, partition_size = partSize + self.partitionMngr.removeOldestPartitionName() + + return ok() + +proc removePartitionsOlderThan( + self: PostgresDriver, tsInNanoSec: Timestamp +): Future[ArchiveDriverResult[void]] {.async.} = + ## Removes old partitions that don't contain the specified timestamp + + let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000) + + var oldestPartition = self.partitionMngr.getOldestPartition().valueOr: + return err("could not get oldest partition in removePartitionOlderThan: " & $error) + + while not oldestPartition.containsMoment(tsInSec): + (await self.removePartition(oldestPartition.getName())).isOkOr: + return err("issue in removePartitionsOlderThan: " & $error) + + oldestPartition = self.partitionMngr.getOldestPartition().valueOr: + return err( + "could not get partition in removePartitionOlderThan in while loop: " & $error + ) + + ## We reached the partition that contains the target timestamp plus don't want to remove it + return ok() + proc removeOldestPartition( self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests ): Future[ArchiveDriverResult[void]] {.async.} = @@ -971,31 +1009,7 @@ proc removeOldestPartition( debug "Skipping to remove the current partition" return ok() - var partSize = "" - let partSizeRes = await self.getTableSize(oldestPartition.getName()) - if partSizeRes.isOk(): - partSize = partSizeRes.get() - - ## In the following lines is where the partition removal happens. - ## Detach and remove the partition concurrently to not block the parent table (messages) - let detachPartitionQuery = - "ALTER TABLE messages DETACH PARTITION " & oldestPartition.getName() & - " CONCURRENTLY;" - debug "removeOldestPartition", query = detachPartitionQuery - (await self.performWriteQuery(detachPartitionQuery)).isOkOr: - return err(fmt"error in {detachPartitionQuery}: " & $error) - - ## Drop the partition - let dropPartitionQuery = "DROP TABLE " & oldestPartition.getName() - debug "removeOldestPartition drop partition", query = dropPartitionQuery - (await self.performWriteQuery(dropPartitionQuery)).isOkOr: - return err(fmt"error in {dropPartitionQuery}: " & $error) - - debug "removed partition", - partition_name = oldestPartition.getName(), partition_size = partSize - self.partitionMngr.removeOldestPartitionName() - - return ok() + return await self.removePartition(oldestPartition.getName()) proc containsAnyPartition*(self: PostgresDriver): bool = return not self.partitionMngr.isEmpty() @@ -1079,3 +1093,16 @@ proc getCurrentVersion*( return err("error in getMessagesCount: " & $error) return ok(res) + +method deleteMessagesOlderThanTimestamp*( + s: PostgresDriver, tsNanoSec: Timestamp +): Future[ArchiveDriverResult[void]] {.async.} = + ## First of all, let's remove the older partitions so that we can reduce + ## the database size. + (await s.removePartitionsOlderThan(tsNanoSec)).isOkOr: + return err("error while removing older partitions: " & $error) + + (await s.writeConnPool.pgQuery("DELETE FROM messages WHERE storedAt < " & $tsNanoSec)).isOkOr: + return err("error in deleteMessagesOlderThanTimestamp: " & $error) + + return ok()