mirror of https://github.com/waku-org/nwaku.git
postgres_driver: delete partitions in time retention policy (#2679)
This commit is contained in:
parent
82f95999cd
commit
b46226fca0
|
@ -741,16 +741,6 @@ method getNewestMessageTimestamp*(
|
|||
|
||||
return ok(Timestamp(intRes.get()))
|
||||
|
||||
method deleteMessagesOlderThanTimestamp*(
|
||||
s: PostgresDriver, ts: Timestamp
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
let execRes =
|
||||
await s.writeConnPool.pgQuery("DELETE FROM messages WHERE storedAt < " & $ts)
|
||||
if execRes.isErr():
|
||||
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
|
||||
|
||||
return ok()
|
||||
|
||||
method deleteOldestMessagesNotWithinLimit*(
|
||||
s: PostgresDriver, limit: int
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
@ -953,6 +943,54 @@ proc getTableSize*(
|
|||
|
||||
return ok(tableSize)
|
||||
|
||||
proc removePartition(
|
||||
self: PostgresDriver, partitionName: string
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
var partSize = ""
|
||||
let partSizeRes = await self.getTableSize(partitionName)
|
||||
if partSizeRes.isOk():
|
||||
partSize = partSizeRes.get()
|
||||
|
||||
## Detach and remove the partition concurrently to not block the parent table (messages)
|
||||
let detachPartitionQuery =
|
||||
"ALTER TABLE messages DETACH PARTITION " & partitionName & " CONCURRENTLY;"
|
||||
debug "removeOldestPartition", query = detachPartitionQuery
|
||||
(await self.performWriteQuery(detachPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {detachPartitionQuery}: " & $error)
|
||||
|
||||
## Drop the partition
|
||||
let dropPartitionQuery = "DROP TABLE " & partitionName
|
||||
debug "removeOldestPartition drop partition", query = dropPartitionQuery
|
||||
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {dropPartitionQuery}: " & $error)
|
||||
|
||||
debug "removed partition", partition_name = partitionName, partition_size = partSize
|
||||
self.partitionMngr.removeOldestPartitionName()
|
||||
|
||||
return ok()
|
||||
|
||||
proc removePartitionsOlderThan(
|
||||
self: PostgresDriver, tsInNanoSec: Timestamp
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Removes old partitions that don't contain the specified timestamp
|
||||
|
||||
let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000)
|
||||
|
||||
var oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
|
||||
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
|
||||
|
||||
while not oldestPartition.containsMoment(tsInSec):
|
||||
(await self.removePartition(oldestPartition.getName())).isOkOr:
|
||||
return err("issue in removePartitionsOlderThan: " & $error)
|
||||
|
||||
oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
|
||||
return err(
|
||||
"could not get partition in removePartitionOlderThan in while loop: " & $error
|
||||
)
|
||||
|
||||
## We reached the partition that contains the target timestamp plus don't want to remove it
|
||||
return ok()
|
||||
|
||||
proc removeOldestPartition(
|
||||
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
@ -971,31 +1009,7 @@ proc removeOldestPartition(
|
|||
debug "Skipping to remove the current partition"
|
||||
return ok()
|
||||
|
||||
var partSize = ""
|
||||
let partSizeRes = await self.getTableSize(oldestPartition.getName())
|
||||
if partSizeRes.isOk():
|
||||
partSize = partSizeRes.get()
|
||||
|
||||
## In the following lines is where the partition removal happens.
|
||||
## Detach and remove the partition concurrently to not block the parent table (messages)
|
||||
let detachPartitionQuery =
|
||||
"ALTER TABLE messages DETACH PARTITION " & oldestPartition.getName() &
|
||||
" CONCURRENTLY;"
|
||||
debug "removeOldestPartition", query = detachPartitionQuery
|
||||
(await self.performWriteQuery(detachPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {detachPartitionQuery}: " & $error)
|
||||
|
||||
## Drop the partition
|
||||
let dropPartitionQuery = "DROP TABLE " & oldestPartition.getName()
|
||||
debug "removeOldestPartition drop partition", query = dropPartitionQuery
|
||||
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {dropPartitionQuery}: " & $error)
|
||||
|
||||
debug "removed partition",
|
||||
partition_name = oldestPartition.getName(), partition_size = partSize
|
||||
self.partitionMngr.removeOldestPartitionName()
|
||||
|
||||
return ok()
|
||||
return await self.removePartition(oldestPartition.getName())
|
||||
|
||||
proc containsAnyPartition*(self: PostgresDriver): bool =
|
||||
return not self.partitionMngr.isEmpty()
|
||||
|
@ -1079,3 +1093,16 @@ proc getCurrentVersion*(
|
|||
return err("error in getMessagesCount: " & $error)
|
||||
|
||||
return ok(res)
|
||||
|
||||
method deleteMessagesOlderThanTimestamp*(
|
||||
s: PostgresDriver, tsNanoSec: Timestamp
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## First of all, let's remove the older partitions so that we can reduce
|
||||
## the database size.
|
||||
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
|
||||
return err("error while removing older partitions: " & $error)
|
||||
|
||||
(await s.writeConnPool.pgQuery("DELETE FROM messages WHERE storedAt < " & $tsNanoSec)).isOkOr:
|
||||
return err("error in deleteMessagesOlderThanTimestamp: " & $error)
|
||||
|
||||
return ok()
|
||||
|
|
Loading…
Reference in New Issue